[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464917#comment-15464917
 ] 

ASF GitHub Bot commented on FLINK-4458:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
I've re-introduced the `ForkableFlinkMiniCluster`. However, this time it 
only starts the normal distributed component classes (e.g. `JobManager` instead 
of `TestingJobManager`, etc.). This implies that the testing messages are no 
longer supported when using the `ForkableFlinkMiniCluster`.

I think the user should not meddle with the testing components in the first 
place. Therefore, it seems to be fair to offer the user a mini cluster to run 
his tests but not one which instantiates testing components. The recommended 
cluster is the `LocalFlinkMiniCluster` from now on. However, in order to not 
break all existing test code, we still maintain the `ForkableFlinkMiniCluster`.

Not having to start the testing classes allows to move them back into the 
test scope of `flink-runtime`, since they are no longer required by the 
`flink-test-utils` module.


> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
I've re-introduced the `ForkableFlinkMiniCluster`. However, this time it 
only starts the normal distributed component classes (e.g. `JobManager` instead 
of `TestingJobManager`, etc.). This implies that the testing messages are no 
longer supported when using the `ForkableFlinkMiniCluster`.

I think the user should not meddle with the testing components in the first 
place. Therefore, it seems to be fair to offer the user a mini cluster to run 
his tests but not one which instantiates testing components. The recommended 
cluster is the `LocalFlinkMiniCluster` from now on. However, in order to not 
break all existing test code, we still maintain the `ForkableFlinkMiniCluster`.

Not having to start the testing classes allows to move them back into the 
test scope of `flink-runtime`, since they are no longer required by the 
`flink-test-utils` module.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464459#comment-15464459
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2461
  
Hi @wangzhijiang999, maybe we should start simple without introducing a 
factory method, because there might actually be not many cases to distinguish. 
Maybe we could rename the `TaskManagerFactory` into `TaskManagerRunner` which 
has static methods to create the `TaskManagers` components and does the network 
selection. That way we keep the initialization and the actual `TaskManager` 
logic separated.

For testing purposes I guess we don't need to setup any components because 
they are usually mocked or one is using testing components. Passing these 
components to the constructor of the `TaskManager` should not be a big deal.

Does this make sense?


> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4396) GraphiteReporter class not found at startup of jobmanager

2016-09-05 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4396:
--
Fix Version/s: (was: 1.1.2)
   1.1.3

> GraphiteReporter class not found at startup of jobmanager
> -
>
> Key: FLINK-4396
> URL: https://issues.apache.org/jira/browse/FLINK-4396
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Metrics
>Affects Versions: 1.1.1
> Environment: Windows and Unix
>Reporter: RWenden
> Fix For: 1.1.3
>
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> For Flink 1.1.1 we configured Graphite metrics settings on the 
> flink-conf.yaml (for job manager (and taskmanager)).
> We see the following error in the log:
> 2016-08-15 14:20:34,167 ERROR org.apache.flink.runtime.metrics.MetricRegistry 
>   - Could not instantiate metrics reportermy_reporter. Metrics 
> might not be exposed/reported.
> java.lang.ClassNotFoundException: 
> org.apache.flink.metrics.graphite.GraphiteReporter
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at 
> org.apache.flink.runtime.metrics.MetricRegistry.(MetricRegistry.java:119)
> We found out that this class is not packaged inside flink-dist_2.11-1.1.1.jar.
> Long story short: we had to install/provide the following jars into the lib 
> folder to make Graphite metrics to work:
> flink-metrics-graphite-1.1.1.jar
> flink-metrics-dropwizard-1.1.1.jar
> metrics-graphite-3.1.0.jar (from dropwizard)
> We think these libraries (and the ones for Ganglia,StatsD,...) should be 
> included in flink-dist_2.11-1.1.1.jar, for these are needed at manager 
> startup time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-05 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4329:
--
Fix Version/s: (was: 1.1.2)
   1.1.3

> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4513) Kafka connector documentation refers to Flink 1.1-SNAPSHOT

2016-09-05 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4513:
--
Fix Version/s: (was: 1.1.2)
   1.1.3

> Kafka connector documentation refers to Flink 1.1-SNAPSHOT
> --
>
> Key: FLINK-4513
> URL: https://issues.apache.org/jira/browse/FLINK-4513
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.1
>Reporter: Fabian Hueske
>Priority: Trivial
> Fix For: 1.1.3
>
>
> The Kafka connector documentation: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html
>  of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2461
  
I think you should have at least another method `startComponents` which 
starts the different components. Everything else can be added later when we see 
that it would be helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464771#comment-15464771
 ] 

ASF GitHub Bot commented on FLINK-4546:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2454#discussion_r77507678
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
 ---
@@ -18,22 +18,11 @@
 
 package org.apache.flink.api.table.plan.schema
 
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.flink.api.table.FlinkTypeFactory
 import org.apache.flink.streaming.api.datastream.DataStream
 
 class DataStreamTable[T](
 val dataStream: DataStream[T],
 override val fieldIndexes: Array[Int],
 override val fieldNames: Array[String])
   extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
--- End diff --

Because this override method is almost the same with super method.  I 
remove it to keep consistent with `DataSetTable` which do not override the 
`getRowType` method. 


>  Remove STREAM keyword in Stream SQL
> 
>
> Key: FLINK-4546
> URL: https://issues.apache.org/jira/browse/FLINK-4546
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM 
> keyword in Stream SQL. 
> detailed discuss mailing list: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2454: [FLINK-4546] [table] Remove STREAM keyword in Stre...

2016-09-05 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2454#discussion_r77507678
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
 ---
@@ -18,22 +18,11 @@
 
 package org.apache.flink.api.table.plan.schema
 
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.flink.api.table.FlinkTypeFactory
 import org.apache.flink.streaming.api.datastream.DataStream
 
 class DataStreamTable[T](
 val dataStream: DataStream[T],
 override val fieldIndexes: Array[Int],
 override val fieldNames: Array[String])
   extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
--- End diff --

Because this override method is almost the same with super method.  I 
remove it to keep consistent with `DataSetTable` which do not override the 
`getRowType` method. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464772#comment-15464772
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2461
  
I think you should have at least another method `startComponents` which 
starts the different components. Everything else can be added later when we see 
that it would be helpful.


> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-09-05 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
Thank you. Testing again and merging if nothing fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464874#comment-15464874
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
Thank you. Testing again and merging if nothing fails.


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2461
  
Hi @wangzhijiang999, maybe we should start simple without introducing a 
factory method, because there might actually be not many cases to distinguish. 
Maybe we could rename the `TaskManagerFactory` into `TaskManagerRunner` which 
has static methods to create the `TaskManagers` components and does the network 
selection. That way we keep the initialization and the actual `TaskManager` 
logic separated.

For testing purposes I guess we don't need to setup any components because 
they are usually mocked or one is using testing components. Passing these 
components to the constructor of the `TaskManager` should not be a big deal.

Does this make sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464553#comment-15464553
 ] 

ASF GitHub Bot commented on FLINK-4458:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2450
  
Quick question: The `ForkableFlinkMiniCluster` was something semi-public, 
in the sense that it was part of the `flink-test-utils` project and used by 
quite a few users. So this would be a breaking change.


> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2452: [Flink-4450] add a new module "flink-apache-storm" to sup...

2016-09-05 Thread liuyuzhong
Github user liuyuzhong commented on the issue:

https://github.com/apache/flink/pull/2452
  
build success

```
[INFO] BUILD SUCCESS
[INFO] 

[INFO] Total time: 01:28 h
[INFO] Finished at: 2016-09-05T04:07:42+00:00
[INFO] Final Memory: 195M/581M
```

but  Finished: UNSTABLE
```
[JENKINS] Archiving 
/home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-contrib/flink-streaming-contrib/target/flink-streaming-contrib_2.10-1.2-SNAPSHOT-javadoc.jar
 to 
org.apache.flink/flink-streaming-contrib_2.10/1.2-SNAPSHOT/flink-streaming-contrib_2.10-1.2-SNAPSHOT-javadoc.jar
channel stopped
Putting comment on the pull request
Finished: UNSTABLE
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464700#comment-15464700
 ] 

ASF GitHub Bot commented on FLINK-4458:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
Yes that is true. However, the `flink-test-utils` have not been guaranteed 
to be stable if I'm not mistaken. The changes one has to apply is to replace 
the `ForkableFlinkMiniCluster` with the `LocalFlinkMiniCluster`.


> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
Yes that is true. However, the `flink-test-utils` have not been guaranteed 
to be stable if I'm not mistaken. The changes one has to apply is to replace 
the `ForkableFlinkMiniCluster` with the `LocalFlinkMiniCluster`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464801#comment-15464801
 ] 

ASF GitHub Bot commented on FLINK-4456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2456#discussion_r77508917
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+public interface TaskExecutionStateListener {
+
+   /**
+* Called whenever the task's execution state changes
+*
+* @param taskExecutionState describing the task execution state change
+*/
+   void notifyTaskExecutionState(TaskExecutionState taskExecutionState);
--- End diff --

Yes, can rename.


> Replace ActorGateway in Task by interface
> -
>
> Key: FLINK-4456
> URL: https://issues.apache.org/jira/browse/FLINK-4456
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{Task}} communicates with the outside world ({{JobManager}} and 
> {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on 
> actors.
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} could simply implement these interfaces as part 
> of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2456#discussion_r77508870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
 ---
@@ -35,4 +35,16 @@
 * task shall not consume any further input splits.
 */
InputSplit getNextInputSplit();
+
+   /**
+* Starts the input split provider with a user code class loader.
+*
+* @param userCodeClassLoader User code class loader to use by the 
input split provider
+*/
+   void start(ClassLoader userCodeClassLoader);
--- End diff --

The `start` method's intention is to pass in the `userCodeClassLoader` 
which is currently created in `Task#run` method. There are two other ways to 
solve the problem. Either creating the user code class loader outside of `Task` 
where the `InputSplitProvider` is created or to pass the user code class loader 
via the `getNextInputSplit` method call to the input split provider.

For the first approach: Creating the user code class loader is a blocking 
operation, so this would have to executed in a future and upon completion we 
could create the `Task` instance in the `TaskManager`.


For the second approach: We would have to touch more code but I think 
everywhere were the `getNextInputSplit` method is called, we have access to the 
user code class loader.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464799#comment-15464799
 ] 

ASF GitHub Bot commented on FLINK-4456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2456#discussion_r77508870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
 ---
@@ -35,4 +35,16 @@
 * task shall not consume any further input splits.
 */
InputSplit getNextInputSplit();
+
+   /**
+* Starts the input split provider with a user code class loader.
+*
+* @param userCodeClassLoader User code class loader to use by the 
input split provider
+*/
+   void start(ClassLoader userCodeClassLoader);
--- End diff --

The `start` method's intention is to pass in the `userCodeClassLoader` 
which is currently created in `Task#run` method. There are two other ways to 
solve the problem. Either creating the user code class loader outside of `Task` 
where the `InputSplitProvider` is created or to pass the user code class loader 
via the `getNextInputSplit` method call to the input split provider.

For the first approach: Creating the user code class loader is a blocking 
operation, so this would have to executed in a future and upon completion we 
could create the `Task` instance in the `TaskManager`.


For the second approach: We would have to touch more code but I think 
everywhere were the `getNextInputSplit` method is called, we have access to the 
user code class loader.



> Replace ActorGateway in Task by interface
> -
>
> Key: FLINK-4456
> URL: https://issues.apache.org/jira/browse/FLINK-4456
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{Task}} communicates with the outside world ({{JobManager}} and 
> {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on 
> actors.
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} could simply implement these interfaces as part 
> of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2456#discussion_r77508917
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+public interface TaskExecutionStateListener {
+
+   /**
+* Called whenever the task's execution state changes
+*
+* @param taskExecutionState describing the task execution state change
+*/
+   void notifyTaskExecutionState(TaskExecutionState taskExecutionState);
--- End diff --

Yes, can rename.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2454: [FLINK-4546] [table] Remove STREAM keyword in Stream SQL

2016-09-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2454
  
Hi @wuchong, thanks for the PR. Just had two minor comments. Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464574#comment-15464574
 ] 

ASF GitHub Bot commented on FLINK-4546:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2454#discussion_r77494279
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
 ---
@@ -18,22 +18,11 @@
 
 package org.apache.flink.api.table.plan.schema
 
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.flink.api.table.FlinkTypeFactory
 import org.apache.flink.streaming.api.datastream.DataStream
 
 class DataStreamTable[T](
 val dataStream: DataStream[T],
 override val fieldIndexes: Array[Int],
 override val fieldNames: Array[String])
   extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
--- End diff --

Why did you remove this method?


>  Remove STREAM keyword in Stream SQL
> 
>
> Key: FLINK-4546
> URL: https://issues.apache.org/jira/browse/FLINK-4546
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM 
> keyword in Stream SQL. 
> detailed discuss mailing list: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464573#comment-15464573
 ] 

ASF GitHub Bot commented on FLINK-4546:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2454#discussion_r77494182
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 ---
@@ -112,7 +112,7 @@ object FlinkRuleSets {
   */
   val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
 
-  RemoveDeltaRule.INSTANCE,
--- End diff --

The `RemoveDeltaRule` class can be removed as well.


>  Remove STREAM keyword in Stream SQL
> 
>
> Key: FLINK-4546
> URL: https://issues.apache.org/jira/browse/FLINK-4546
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM 
> keyword in Stream SQL. 
> detailed discuss mailing list: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464765#comment-15464765
 ] 

ASF GitHub Bot commented on FLINK-4458:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
I will re-introduce the `ForkableFlinkMiniCluster` which is simply 
extending the `LocalFlinkMiniCluster`.


> Remove ForkableFlinkMiniCluster
> ---
>
> Key: FLINK-4458
> URL: https://issues.apache.org/jira/browse/FLINK-4458
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> After addressing FLINK-4424 we should be able to get rid of the 
> {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port 
> in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a 
> free port, there should no longer be conflicting port requests. Consequently, 
> the {{ForkableFlinkMiniCluster}} will become obsolete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2450
  
I will re-introduce the `ForkableFlinkMiniCluster` which is simply 
extending the `LocalFlinkMiniCluster`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4580) Check that the RpcEndpoint supports the specified RpcGateway

2016-09-05 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-4580:


 Summary: Check that the RpcEndpoint supports the specified 
RpcGateway
 Key: FLINK-4580
 URL: https://issues.apache.org/jira/browse/FLINK-4580
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.2.0
Reporter: Till Rohrmann
Priority: Minor


When calling {{RpcService.connect}} the user specifies the type of the 
{{RpcGateway}}. At the moment, it is not checked whether the {{RpcEndpoint}} 
actually supports the specified {{RpcGateway}}.

I think it would be good to add a runtime check that the corresponding 
{{RpcEndpoint}} supports the specified {{RpcGateway}}. If not, then we can let 
the connect method fail fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

2016-09-05 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2456#discussion_r77492066
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+public interface TaskExecutionStateListener {
+
+   /**
+* Called whenever the task's execution state changes
+*
+* @param taskExecutionState describing the task execution state change
+*/
+   void notifyTaskExecutionState(TaskExecutionState taskExecutionState);
--- End diff --

`notifyTaskExecutionStateChanged(...)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

2016-09-05 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2456#discussion_r77492004
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
 ---
@@ -35,4 +35,16 @@
 * task shall not consume any further input splits.
 */
InputSplit getNextInputSplit();
+
+   /**
+* Starts the input split provider with a user code class loader.
+*
+* @param userCodeClassLoader User code class loader to use by the 
input split provider
+*/
+   void start(ClassLoader userCodeClassLoader);
--- End diff --

Is it possible to have the `start()` and `stop()` methods not in the base 
interface, but only in the Akka specific implementation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464529#comment-15464529
 ] 

ASF GitHub Bot commented on FLINK-4456:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2456
  
I think this is good.
Few comments inline on the names, the hardest problem usually ;-)


> Replace ActorGateway in Task by interface
> -
>
> Key: FLINK-4456
> URL: https://issues.apache.org/jira/browse/FLINK-4456
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{Task}} communicates with the outside world ({{JobManager}} and 
> {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on 
> actors.
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} could simply implement these interfaces as part 
> of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464544#comment-15464544
 ] 

ASF GitHub Bot commented on FLINK-4456:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2456#discussion_r77492066
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+public interface TaskExecutionStateListener {
+
+   /**
+* Called whenever the task's execution state changes
+*
+* @param taskExecutionState describing the task execution state change
+*/
+   void notifyTaskExecutionState(TaskExecutionState taskExecutionState);
--- End diff --

`notifyTaskExecutionStateChanged(...)`?


> Replace ActorGateway in Task by interface
> -
>
> Key: FLINK-4456
> URL: https://issues.apache.org/jira/browse/FLINK-4456
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{Task}} communicates with the outside world ({{JobManager}} and 
> {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on 
> actors.
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} could simply implement these interfaces as part 
> of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2471: Broken links on website

2016-09-05 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2471
  
Thank your for reporting this issue. The Apache Flink project uses JIRA to 
track issues, not GitHub pull requests, so it would be nice if you could close 
this PR and file a ticket on 
[JIRA](https://issues.apache.org/jira/browse/FLINK) instead. You would help us 
a lot if you coudl mention where you found these links ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2454: [FLINK-4546] [table] Remove STREAM keyword in Stre...

2016-09-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2454#discussion_r77494279
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala
 ---
@@ -18,22 +18,11 @@
 
 package org.apache.flink.api.table.plan.schema
 
-import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
-import org.apache.flink.api.table.FlinkTypeFactory
 import org.apache.flink.streaming.api.datastream.DataStream
 
 class DataStreamTable[T](
 val dataStream: DataStream[T],
 override val fieldIndexes: Array[Int],
 override val fieldNames: Array[String])
   extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) {
-
-  override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
--- End diff --

Why did you remove this method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464788#comment-15464788
 ] 

ASF GitHub Bot commented on FLINK-4456:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2456#discussion_r77508238
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointNotifier.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.List;
+
+/**
+ * Notifier for checkpoint acknowledge and decline messages in the {@link 
Task}.
+ */
+public interface CheckpointNotifier {
--- End diff --

Oh I've overlooked this class. Will rename it into `CheckpointResponder`.


> Replace ActorGateway in Task by interface
> -
>
> Key: FLINK-4456
> URL: https://issues.apache.org/jira/browse/FLINK-4456
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{Task}} communicates with the outside world ({{JobManager}} and 
> {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on 
> actors.
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} could simply implement these interfaces as part 
> of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2016-09-05 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464487#comment-15464487
 ] 

Lorenz Bühmann edited comment on FLINK-2662 at 9/5/16 8:55 AM:
---

[~fhueske] I attached a file. It (hopefully) contains a minimal example that 
leads to the reported exception. While keeping it as minimal as possible, the 
whole logic behind the program got lost - so please don't think about it's 
meaning. Flink version used was 1.1.0 via Maven


was (Author: lorenzb):
[~fhueske] I attached a file. It contains (hopefully) a minimal example that 
leads to the reported exception. While keeping it as minimal as possible, the 
whole logic behind the program got lost - so please don't think about it's 
meaning.

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FlinkBug.scala
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464542#comment-15464542
 ] 

ASF GitHub Bot commented on FLINK-4456:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2456#discussion_r77492004
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java
 ---
@@ -35,4 +35,16 @@
 * task shall not consume any further input splits.
 */
InputSplit getNextInputSplit();
+
+   /**
+* Starts the input split provider with a user code class loader.
+*
+* @param userCodeClassLoader User code class loader to use by the 
input split provider
+*/
+   void start(ClassLoader userCodeClassLoader);
--- End diff --

Is it possible to have the `start()` and `stop()` methods not in the base 
interface, but only in the Akka specific implementation?


> Replace ActorGateway in Task by interface
> -
>
> Key: FLINK-4456
> URL: https://issues.apache.org/jira/browse/FLINK-4456
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{Task}} communicates with the outside world ({{JobManager}} and 
> {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on 
> actors.
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} could simply implement these interfaces as part 
> of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2473: [FLINK-4580] [rpc] Verify that the rpc endpoint su...

2016-09-05 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2473

[FLINK-4580] [rpc] Verify that the rpc endpoint supports the rpc gateway at 
connect time

When calling RpcService.connect it is checked that the rpc endpoint 
supports the specified
rpc gateway. If not, then a RpcConnectionException is thrown. The 
verification is implemented
as an additional message following after the Identify message. The reason 
for this is that
the ActorSystem won't wait for the Identify message to time out after it 
has determined that
the specified actor does not exist. For user-level messages this seems to 
be not the case and,
thus, we would have to wait for the timeout.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink addGatewayEndpointCheck

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2473.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2473


commit 189fb14ddf30726f537f681e800c111ba9bc7c81
Author: Till Rohrmann 
Date:   2016-09-05T10:13:29Z

[FLINK-4580] [rpc] Verify that the rpc endpoint supports the rpc gateway at 
connect time

When calling RpcService.connect it is checked that the rpc endpoint 
supports the specified
rpc gateway. If not, then a RpcConnectionException is thrown. The 
verification is implemented
as an additional message following after the Identify message. The reason 
for this is that
the ActorSystem won't wait for the Identify message to time out after it 
has determined that
the specified actor does not exist. For user-level messages this seems to 
be not the case and,
thus, we would have to wait for the timeout.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2452: [Flink-4450] add a new module "flink-apache-storm"...

2016-09-05 Thread liuyuzhong
Github user liuyuzhong closed the pull request at:

https://github.com/apache/flink/pull/2452


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4580) Check that the RpcEndpoint supports the specified RpcGateway

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464682#comment-15464682
 ] 

ASF GitHub Bot commented on FLINK-4580:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2473

[FLINK-4580] [rpc] Verify that the rpc endpoint supports the rpc gateway at 
connect time

When calling RpcService.connect it is checked that the rpc endpoint 
supports the specified
rpc gateway. If not, then a RpcConnectionException is thrown. The 
verification is implemented
as an additional message following after the Identify message. The reason 
for this is that
the ActorSystem won't wait for the Identify message to time out after it 
has determined that
the specified actor does not exist. For user-level messages this seems to 
be not the case and,
thus, we would have to wait for the timeout.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink addGatewayEndpointCheck

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2473.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2473


commit 189fb14ddf30726f537f681e800c111ba9bc7c81
Author: Till Rohrmann 
Date:   2016-09-05T10:13:29Z

[FLINK-4580] [rpc] Verify that the rpc endpoint supports the rpc gateway at 
connect time

When calling RpcService.connect it is checked that the rpc endpoint 
supports the specified
rpc gateway. If not, then a RpcConnectionException is thrown. The 
verification is implemented
as an additional message following after the Identify message. The reason 
for this is that
the ActorSystem won't wait for the Identify message to time out after it 
has determined that
the specified actor does not exist. For user-level messages this seems to 
be not the case and,
thus, we would have to wait for the timeout.




> Check that the RpcEndpoint supports the specified RpcGateway
> 
>
> Key: FLINK-4580
> URL: https://issues.apache.org/jira/browse/FLINK-4580
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> When calling {{RpcService.connect}} the user specifies the type of the 
> {{RpcGateway}}. At the moment, it is not checked whether the {{RpcEndpoint}} 
> actually supports the specified {{RpcGateway}}.
> I think it would be good to add a runtime check that the corresponding 
> {{RpcEndpoint}} supports the specified {{RpcGateway}}. If not, then we can 
> let the connect method fail fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464709#comment-15464709
 ] 

ASF GitHub Bot commented on FLINK-4505:
---

Github user wangzhijiang999 commented on the issue:

https://github.com/apache/flink/pull/2461
  
Yes, that also make sense. For testing purpose it is very clear, so we do 
not need do anything currently, all the components in `TaskManager` constructor 
can be mocked implicitly.

For `TaskManagerRunner`, the purpose is to pull out the initialization of 
related components from `TaskManager` to make it logic clear. Just one issue to 
be confirmed, we should provide more static methods of different parameter 
units for outside world or just one static method such as 
'selectNetworkInterfaceAndRunTaskManager(`Configuration` 
configuration,`ResourceID` resourceID) '? 
I think providing more methods with different parameters may be reasonable, 
because some parameters such as 'hostname','port', `RpcService`, 
`HighAvailabilityServices` may want to be passed from outside.


> Implement TaskManagerFactory to bring up TaskManager for different modes
> 
>
> Key: FLINK-4505
> URL: https://issues.apache.org/jira/browse/FLINK-4505
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>Priority: Minor
>
> Implement {{TaskExecutorFactory}} that should be an abstract class with the 
> helper methods to bring up the {{TaskManager}}. The factory can be 
> implemented by some classes to start a {{TaskManager}} in different modes 
> (testing, standalone, yarn).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4579) Add StateBackendFactory for RocksDB Backend

2016-09-05 Thread Jark Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464832#comment-15464832
 ] 

Jark Wu commented on FLINK-4579:


+1 for adding rocksdb to the standard distribution lib. So we only need to 
change the flink-dist pom setting, right ?

> Add StateBackendFactory for RocksDB Backend
> ---
>
> Key: FLINK-4579
> URL: https://issues.apache.org/jira/browse/FLINK-4579
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>
> Right now, we only have a {{StateBackendFactory}} for the {{FsStateBackend}} 
> which means that users cannot specify to use the RocksDB backend in the flink 
> configuration.
> If we add a factory for rocksdb we should also think about adding the rocksdb 
> backend to the standard distribution lib, otherwise it is only usable if 
> users manually place the rocks jars in the Flink lib folder.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464910#comment-15464910
 ] 

ASF GitHub Bot commented on FLINK-4456:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2456
  
I've addressed the comments. In order to solve the problem with the user 
code class loader, I decided to pass it into the `InputSplitProvider` via the 
`getNextInputSplit` method. That way, we don't have to know the user code class 
loader when creating the `InputSplitProvider` implementation.


> Replace ActorGateway in Task by interface
> -
>
> Key: FLINK-4456
> URL: https://issues.apache.org/jira/browse/FLINK-4456
> Project: Flink
>  Issue Type: Improvement
>  Components: TaskManager
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{Task}} communicates with the outside world ({{JobManager}} and 
> {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on 
> actors.
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} could simply implement these interfaces as part 
> of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2456: [FLINK-4456] Replace ActorGateway in Task and RuntimeEnvi...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2456
  
I've addressed the comments. In order to solve the problem with the user 
code class loader, I decided to pass it into the `InputSplitProvider` via the 
`getNextInputSplit` method. That way, we don't have to know the user code class 
loader when creating the `InputSplitProvider` implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2016-09-05 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464508#comment-15464508
 ] 

Fabian Hueske commented on FLINK-2662:
--

Thanks a lot [~LorenzB]! This will be very helpful.

I'll try to look into this issue soon but will be quite busy the next couple of 
days.

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FlinkBug.scala
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

2016-09-05 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2456#discussion_r77491885
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointNotifier.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.List;
+
+/**
+ * Notifier for checkpoint acknowledge and decline messages in the {@link 
Task}.
+ */
+public interface CheckpointNotifier {
--- End diff --

`CheckpointNotifier` is already used for user functions, as an interface to 
get notifications on completed checkpoints.

How about calling this `CheckpointResponder` or something like that instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...

2016-09-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2450
  
Quick question: The `ForkableFlinkMiniCluster` was something semi-public, 
in the sense that it was part of the `flink-test-utils` project and used by 
quite a few users. So this would be a breaking change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2456: [FLINK-4456] Replace ActorGateway in Task and RuntimeEnvi...

2016-09-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2456
  
I think this is good.
Few comments inline on the names, the hardest problem usually ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4451) Throw exception when remote connection cannot be resolved

2016-09-05 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464545#comment-15464545
 ] 

Till Rohrmann commented on FLINK-4451:
--

Fixed via 0cf2a822b6872ee4f3f0c99f0fcee71affaeaee5

> Throw exception when remote connection cannot be resolved
> -
>
> Key: FLINK-4451
> URL: https://issues.apache.org/jira/browse/FLINK-4451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{RpcService}} implementation should throw an exception (returned in the 
> future) if {{RpcService.connect(address, type)}} cannot connect to the remote 
> {{RpcEndpoint}}.
> At the moment the {{AkkaRpcService}} does not check that the 
> {{IdentifyActor}} message contains a valid {{ActorRef}} and throws due to 
> that a {{NullPointerException}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2454: [FLINK-4546] [table] Remove STREAM keyword in Stre...

2016-09-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2454#discussion_r77494182
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
 ---
@@ -112,7 +112,7 @@ object FlinkRuleSets {
   */
   val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
 
-  RemoveDeltaRule.INSTANCE,
--- End diff --

The `RemoveDeltaRule` class can be removed as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2454: [FLINK-4546] [table] Remove STREAM keyword in Stream SQL

2016-09-05 Thread wuchong
Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2454
  
Hi @fhueske , thanks for reviewing,  I updated this PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2456#discussion_r77508238
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointNotifier.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.List;
+
+/**
+ * Notifier for checkpoint acknowledge and decline messages in the {@link 
Task}.
+ */
+public interface CheckpointNotifier {
--- End diff --

Oh I've overlooked this class. Will rename it into `CheckpointResponder`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2016-09-05 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464487#comment-15464487
 ] 

Lorenz Bühmann edited comment on FLINK-2662 at 9/5/16 8:53 AM:
---

[~fhueske] I attached a file. It contains (hopefully) a minimal example that 
leads to the reported exception. While keeping it as minimal as possible, the 
whole logic behind the program got lost - so please don't think about it's 
meaning.


was (Author: lorenzb):
A minimal example that leads to the reported exception. While keeping it as 
minimal as possible, the whole logic behind the program got lost - so please 
don't think about it's meaning.

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FlinkBug.scala
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464577#comment-15464577
 ] 

ASF GitHub Bot commented on FLINK-4546:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/2454
  
Hi @wuchong, thanks for the PR. Just had two minor comments. Thanks, Fabian


>  Remove STREAM keyword in Stream SQL
> 
>
> Key: FLINK-4546
> URL: https://issues.apache.org/jira/browse/FLINK-4546
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM 
> keyword in Stream SQL. 
> detailed discuss mailing list: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."

2016-09-05 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorenz Bühmann updated FLINK-2662:
--
Attachment: FlinkBug.scala

A minimal example that leads to the reported exception. While keeping it as 
minimal as possible, the whole logic behind the program got lost - so please 
don't think about it's meaning.

> CompilerException: "Bug: Plan generation for Unions picked a ship strategy 
> between binary plan operators."
> --
>
> Key: FLINK-2662
> URL: https://issues.apache.org/jira/browse/FLINK-2662
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 0.9.1, 0.10.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
> Attachments: FlinkBug.scala
>
>
> I have a Flink program which throws the exception in the jira title. Full 
> text:
> Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: 
> Plan generation for Unions picked a ship strategy between binary plan 
> operators.
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
>   at 
> org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49)
>   at 
> org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41)
>   at 
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
>   at 
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520)
>   at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
>   at 
> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202)
>   at 
> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63)
>   at malom.Solver.main(Solver.java:66)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> The execution plan:
> http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt
> (I obtained this by commenting out the line that throws the exception)
> The code is here:
> https://github.com/ggevay/flink/tree/plan-generation-bug
> The class to run is "Solver". It needs a command line argument, which is a 
> directory where it would write output. (On first run, it generates some 
> lookuptables for a few minutes, which are then placed to /tmp/movegen)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464784#comment-15464784
 ] 

ASF GitHub Bot commented on FLINK-4546:
---

Github user wuchong commented on the issue:

https://github.com/apache/flink/pull/2454
  
Hi @fhueske , thanks for reviewing,  I updated this PR. 


>  Remove STREAM keyword in Stream SQL
> 
>
> Key: FLINK-4546
> URL: https://issues.apache.org/jira/browse/FLINK-4546
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Jark Wu
>Assignee: Jark Wu
>
> It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM 
> keyword in Stream SQL. 
> detailed discuss mailing list: 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465479#comment-15465479
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r77547550
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseClient.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A client class that serves to create connection and send data to HBase.
+ */
+class HBaseClient implements Closeable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseClient.class);
+
+   private Connection connection;
+   private Table table;
+
+   public HBaseClient(org.apache.hadoop.conf.Configuration hbConfig, 
String tableName) throws IOException {
+   connection = ConnectionFactory.createConnection(hbConfig);
+   table = connection.getTable(TableName.valueOf(tableName));
--- End diff --

Not able to see the connect method. May be it is not added?


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465005#comment-15465005
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77522737
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -131,9 +149,16 @@ public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
 * @return Slot assignment
 */
@RpcMethod
-   public SlotAssignment requestSlot(SlotRequest slotRequest) {
-   System.out.println("SlotRequest: " + slotRequest);
-   return new SlotAssignment();
+   public SlotRequestRegistered requestSlot(SlotRequest slotRequest) {
+   final JobID jobId = slotRequest.getJobId();
+   final JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
+
+   if (jobMasterGateway != null) {
+   return slotManager.requestSlot(slotRequest);
+   } else {
+   LOG.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
+   return null;
--- End diff --

Not sure whether we should return `null` here, a negative 
`SlotRequestRegistered` response or throw an exception which will be handled by 
the caller. Why did you choose `null`?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77522737
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -131,9 +149,16 @@ public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
 * @return Slot assignment
 */
@RpcMethod
-   public SlotAssignment requestSlot(SlotRequest slotRequest) {
-   System.out.println("SlotRequest: " + slotRequest);
-   return new SlotAssignment();
+   public SlotRequestRegistered requestSlot(SlotRequest slotRequest) {
+   final JobID jobId = slotRequest.getJobId();
+   final JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
+
+   if (jobMasterGateway != null) {
+   return slotManager.requestSlot(slotRequest);
+   } else {
+   LOG.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
+   return null;
--- End diff --

Not sure whether we should return `null` here, a negative 
`SlotRequestRegistered` response or throw an exception which will be handled by 
the caller. Why did you choose `null`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465061#comment-15465061
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2463
  
Thanks for your work @mxm. I've had some comments which you can find 
inline. I think the implementation of the slot request logic made another step 
in the right direction with this PR.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-09-05 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77538038
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -35,109 +46,111 @@
final Map taskManagers = new 
HashMap<>();
final Map jobs = new HashMap<>();
 
-   /**
-* Adds a metric to this MetricStore.
-*
-* @param name  the metric identifier
-* @param value the metric value
-*/
-   public void add(String name, Object value) {
-   TaskManagerMetricStore tm;
-   JobMetricStore job;
-   TaskMetricStore task;
-
+   public void add(MetricDump metric) {
try {
-   String[] components = name.split(":");
-   switch (components[0]) {
-   /**
-* JobManagerMetricStore metric
-* format: 0:.
-*/
-   case "0":
-   jobManager.metrics.put(components[1], 
value);
-   break;
-   /**
-* TaskManager metric
-* format: 1::.
-*/
-   case "1":
-   if (components.length != 3) {
-   break;
-   }
-   tm = taskManagers.get(components[1]);
+   QueryScopeInfo info = metric.scopeInfo;
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   String name = info.scope.isEmpty()
+   ? metric.name
+   : info.scope + "." + metric.name;
+   
+   if (name.isEmpty()) { // malformed transmission
+   return;
+   }
+
+   switch (info.getCategory()) {
+   case INFO_CATEGORY_JM:
--- End diff --

eh, seemed like the proper way of handling it. Also, (up to) 4 comparisons 
vs a jump.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-05 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r77547550
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseClient.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A client class that serves to create connection and send data to HBase.
+ */
+class HBaseClient implements Closeable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseClient.class);
+
+   private Connection connection;
+   private Table table;
+
+   public HBaseClient(org.apache.hadoop.conf.Configuration hbConfig, 
String tableName) throws IOException {
+   connection = ConnectionFactory.createConnection(hbConfig);
+   table = connection.getTable(TableName.valueOf(tableName));
--- End diff --

Not able to see the connect method. May be it is not added?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-05 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r77547915
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Rest looks good to me. I think naming of functions am not sure if it can be 
fine tuned. Am not very good at naming. And also FLINK team may have some 
naming conventions. I think having a connect() method and the above comment are 
the main things from my side. If it is fine, then it is upto Flink team to 
review this. Thanks @delding .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465487#comment-15465487
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r77547840
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Fine with all. But how do you related this rowKey and the one to be passed 
to createMutations?  May be will it be better to have actions(byte[] rowkey, IN 
value)? and add a getRowKey() API that returns byte[]? This can be used for 
createMutations?


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink

2016-09-05 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r77547840
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Fine with all. But how do you related this rowKey and the one to be passed 
to createMutations?  May be will it be better to have actions(byte[] rowkey, IN 
value)? and add a getRowKey() API that returns byte[]? This can be used for 
createMutations?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465491#comment-15465491
 ] 

ASF GitHub Bot commented on FLINK-2055:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2332#discussion_r77547915
  
--- Diff: 
flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.hbase;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Maps a input value to a row in HBase table.
+ *
+ * @param  input type
+ */
+public interface HBaseMapper extends Function, Serializable {
+
+   /**
+* Given an input value return the HBase row key. Row key cannot be 
null.
+*
+* @param value
+* @return row key
+*/
+   byte[] rowKey(IN value);
--- End diff --

Rest looks good to me. I think naming of functions am not sure if it can be 
fine tuned. Am not very good at naming. And also FLINK team may have some 
naming conventions. I think having a connect() method and the above comment are 
the main things from my side. If it is fine, then it is upto Flink team to 
review this. Thanks @delding .


> Implement Streaming HBaseSink
> -
>
> Key: FLINK-2055
> URL: https://issues.apache.org/jira/browse/FLINK-2055
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming, Streaming Connectors
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Erli Ding
>
> As per : 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allocation ...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2463
  
Thanks for your work @mxm. I've had some comments which you can find 
inline. I think the implementation of the slot request logic made another step 
in the right direction with this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465054#comment-15465054
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524626
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

Why do you create a registry for a single field?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524626
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java
 ---
@@ -15,11 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.flink.runtime.highavailability;
 
-package org.apache.flink.runtime.resourcemanager;
+import java.util.UUID;
 
-import java.io.Serializable;
+/**
+ * Registry class to keep track of the current leader ID.
+ */
+public class LeaderIdRegistry {
--- End diff --

Why do you create a registry for a single field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77527822
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+/**
+ * A container for a dumped metric that contains the scope, name and 
value(s) of the metric.
+ */
+public abstract class MetricDump {
+   /** Categories to be returned by {@link MetricDump#getCategory()} to 
avoid instanceof checks. */
+   public static final byte METRIC_CATEGORY_COUNTER = 0;
+   public static final byte METRIC_CATEGORY_GAUGE = 1;
+   public static final byte METRIC_CATEGORY_HISTOGRAM = 2;
+
+   /** The scope information for the stored metric. */
+   public final QueryScopeInfo scopeInfo;
+   /** The name of the stored metric. */
+   public final String name;
+
+   private MetricDump(QueryScopeInfo scopeInfo, String name) {
+   this.scopeInfo = scopeInfo;
+   this.name = name;
+   }
+
+   /**
+* Returns the category for this MetricDump.
+*
+* @return category
+*/
+   public abstract byte getCategory();
--- End diff --

I think we don't need the explicit category information, because it is 
already encoded in the sub-types of `MetricDump`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465108#comment-15465108
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77527854
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+/**
+ * A container for a dumped metric that contains the scope, name and 
value(s) of the metric.
+ */
+public abstract class MetricDump {
+   /** Categories to be returned by {@link MetricDump#getCategory()} to 
avoid instanceof checks. */
+   public static final byte METRIC_CATEGORY_COUNTER = 0;
+   public static final byte METRIC_CATEGORY_GAUGE = 1;
+   public static final byte METRIC_CATEGORY_HISTOGRAM = 2;
+
+   /** The scope information for the stored metric. */
+   public final QueryScopeInfo scopeInfo;
+   /** The name of the stored metric. */
+   public final String name;
+
+   private MetricDump(QueryScopeInfo scopeInfo, String name) {
+   this.scopeInfo = scopeInfo;
+   this.name = name;
--- End diff --

`checkNotNull` missing


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465106#comment-15465106
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77527822
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+/**
+ * A container for a dumped metric that contains the scope, name and 
value(s) of the metric.
+ */
+public abstract class MetricDump {
+   /** Categories to be returned by {@link MetricDump#getCategory()} to 
avoid instanceof checks. */
+   public static final byte METRIC_CATEGORY_COUNTER = 0;
+   public static final byte METRIC_CATEGORY_GAUGE = 1;
+   public static final byte METRIC_CATEGORY_HISTOGRAM = 2;
+
+   /** The scope information for the stored metric. */
+   public final QueryScopeInfo scopeInfo;
+   /** The name of the stored metric. */
+   public final String name;
+
+   private MetricDump(QueryScopeInfo scopeInfo, String name) {
+   this.scopeInfo = scopeInfo;
+   this.name = name;
+   }
+
+   /**
+* Returns the category for this MetricDump.
+*
+* @return category
+*/
+   public abstract byte getCategory();
--- End diff --

I think we don't need the explicit category information, because it is 
already encoded in the sub-types of `MetricDump`.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77527854
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+/**
+ * A container for a dumped metric that contains the scope, name and 
value(s) of the metric.
+ */
+public abstract class MetricDump {
+   /** Categories to be returned by {@link MetricDump#getCategory()} to 
avoid instanceof checks. */
+   public static final byte METRIC_CATEGORY_COUNTER = 0;
+   public static final byte METRIC_CATEGORY_GAUGE = 1;
+   public static final byte METRIC_CATEGORY_HISTOGRAM = 2;
+
+   /** The scope information for the stored metric. */
+   public final QueryScopeInfo scopeInfo;
+   /** The name of the stored metric. */
+   public final String name;
+
+   private MetricDump(QueryScopeInfo scopeInfo, String name) {
+   this.scopeInfo = scopeInfo;
+   this.name = name;
--- End diff --

`checkNotNull` missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2363
  
I think the changes look good. Thanks for your work @zentol :-) I only had 
a minor question whether we can substitute the explicit category information by 
the type information of the metric dumps and the `QueryScopeInfo` instances 
(not for serialization but in the `MetricStore`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77540202
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -35,109 +46,111 @@
final Map taskManagers = new 
HashMap<>();
final Map jobs = new HashMap<>();
 
-   /**
-* Adds a metric to this MetricStore.
-*
-* @param name  the metric identifier
-* @param value the metric value
-*/
-   public void add(String name, Object value) {
-   TaskManagerMetricStore tm;
-   JobMetricStore job;
-   TaskMetricStore task;
-
+   public void add(MetricDump metric) {
try {
-   String[] components = name.split(":");
-   switch (components[0]) {
-   /**
-* JobManagerMetricStore metric
-* format: 0:.
-*/
-   case "0":
-   jobManager.metrics.put(components[1], 
value);
-   break;
-   /**
-* TaskManager metric
-* format: 1::.
-*/
-   case "1":
-   if (components.length != 3) {
-   break;
-   }
-   tm = taskManagers.get(components[1]);
+   QueryScopeInfo info = metric.scopeInfo;
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   String name = info.scope.isEmpty()
+   ? metric.name
+   : info.scope + "." + metric.name;
+   
+   if (name.isEmpty()) { // malformed transmission
+   return;
+   }
+
+   switch (info.getCategory()) {
+   case INFO_CATEGORY_JM:
--- End diff --

That is true. Performance-wise it is the more efficient way to execute it, 
no doubt. I was just wondering whether this is not a case of premature 
optimization with the price of harder maintainability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524466
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+   try {
+   Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+   } catch (Exception e) {
+   Assert.fail("JobManager registration Future didn't 

[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465044#comment-15465044
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524466
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, 

[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77527637
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -35,109 +46,111 @@
final Map taskManagers = new 
HashMap<>();
final Map jobs = new HashMap<>();
 
-   /**
-* Adds a metric to this MetricStore.
-*
-* @param name  the metric identifier
-* @param value the metric value
-*/
-   public void add(String name, Object value) {
-   TaskManagerMetricStore tm;
-   JobMetricStore job;
-   TaskMetricStore task;
-
+   public void add(MetricDump metric) {
try {
-   String[] components = name.split(":");
-   switch (components[0]) {
-   /**
-* JobManagerMetricStore metric
-* format: 0:.
-*/
-   case "0":
-   jobManager.metrics.put(components[1], 
value);
-   break;
-   /**
-* TaskManager metric
-* format: 1::.
-*/
-   case "1":
-   if (components.length != 3) {
-   break;
-   }
-   tm = taskManagers.get(components[1]);
+   QueryScopeInfo info = metric.scopeInfo;
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   String name = info.scope.isEmpty()
+   ? metric.name
+   : info.scope + "." + metric.name;
+   
+   if (name.isEmpty()) { // malformed transmission
+   return;
+   }
+
+   switch (info.getCategory()) {
+   case INFO_CATEGORY_JM:
+   addMetric(jobManager.metrics, name, 
metric);
+   case INFO_CATEGORY_TM:
+   String tmID = 
((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
+   tm = taskManagers.get(tmID);
if (tm == null) {
tm = new 
TaskManagerMetricStore();
-   taskManagers.put(components[1], 
tm);
+   taskManagers.put(tmID, tm);
}
-   tm.metrics.put(components[2], value);
+   addMetric(tm.metrics, name, metric);
break;
-   /**
-* Job metric
-* format: 2::.
-*/
-   case "2":
-   if (components.length != 3) {
-   break;
-   }
-   job = jobs.get(components[1]);
+   case INFO_CATEGORY_JOB:
+   QueryScopeInfo.JobQueryScopeInfo 
jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
+   job = jobs.get(jobInfo.jobID);
if (job == null) {
job = new JobMetricStore();
-   jobs.put(components[1], job);
+   jobs.put(jobInfo.jobID, job);
}
-   job.metrics.put(components[2], value);
+   addMetric(job.metrics, name, metric);
break;
-   /**
-* Task metric
-* format: 
3.
-*
-* As the WebInterface task metric queries 
currently do not account for subtasks we don't 
-* divide by subtask and instead use the 
concatenation of subtask index and metric name as the name. 
-   

[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465132#comment-15465132
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2363
  
I think the changes look good. Thanks for your work @zentol :-) I only had 
a minor question whether we can substitute the explicit category information by 
the type information of the metric dumps and the `QueryScopeInfo` instances 
(not for serialization but in the `MetricStore`).


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524239
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
 ---
@@ -46,13 +47,21 @@
/** if the slot is allocated, jobId identify which job this slot is 
allocated to; else, jobId is null */
private final JobID jobID;
 
-   public SlotStatus(SlotID slotID, ResourceProfile profiler) {
-   this(slotID, profiler, null, null);
+   /** Gateway to the TaskManager which reported the SlotStatus */
+   private final TaskExecutorGateway taskExecutorGateway;
--- End diff --

The `SlotStatus` is no longer serializable with this field. Where does the 
`SlotStatus` come from? If it's coming from the `TaskExecutor`, then the 
`taskExecutorGateway` has to be retrieved on the `ResourceManager` side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465111#comment-15465111
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77528158
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 ---
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+
+/**
+ * Utility class for the serialization of metrics.
+ */
+public class MetricDumpSerialization {
+   private static final Logger LOG = 
LoggerFactory.getLogger(MetricDumpSerialization.class);
+
+   private MetricDumpSerialization() {
+   }
+
+   // = Serialization 
=
+   public static class MetricDumpSerializer {
+   private ByteArrayOutputStream baos = new 
ByteArrayOutputStream(4096);
+   private DataOutputStream dos = new DataOutputStream(baos);
+
+   /**
+* Serializes the given metrics and returns the resulting byte 
array.
+*
+* @param counters   counters to serialize
+* @param gauges gauges to serialize
+* @param histograms histograms to serialize
+* @return byte array containing the serialized metrics
+* @throws IOException
+*/
+   public byte[] serialize(Map> counters, Map> gauges, 
Map> histograms) throws IOException {
+   baos.reset();
+   dos.writeInt(counters.size());
+   dos.writeInt(gauges.size());
+   dos.writeInt(histograms.size());
+
+   for (Map.Entry> 
entry : counters.entrySet()) {
+   serializeMetricInfo(dos, entry.getValue().f0);
+   serializeString(dos, entry.getValue().f1);
+   serializeCounter(dos, entry.getKey());
+   }
+
+   for (Map.Entry> entry : gauges.entrySet()) {
+   serializeMetricInfo(dos, entry.getValue().f0);
+   serializeString(dos, entry.getValue().f1);
+   serializeGauge(dos, entry.getKey());
+   }
+
+   for (Map.Entry> entry : histograms.entrySet()) {
+   serializeMetricInfo(dos, entry.getValue().f0);
+   

[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465000#comment-15465000
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77522339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

Why not making it static?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77522339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
@@ -52,15 +58,28 @@
  * 
  */
 public class ResourceManager extends RpcEndpoint {
-   private final Map jobMasterGateways;
+
+   private final Logger LOG = LoggerFactory.getLogger(getClass());
--- End diff --

Why not making it static?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77523806
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+   try {
+   Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+   } catch (Exception e) {
+   Assert.fail("JobManager registration Future didn't 

[jira] [Commented] (FLINK-4579) Add StateBackendFactory for RocksDB Backend

2016-09-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465025#comment-15465025
 ] 

Aljoscha Krettek commented on FLINK-4579:
-

Should be as easy as that, yes.

> Add StateBackendFactory for RocksDB Backend
> ---
>
> Key: FLINK-4579
> URL: https://issues.apache.org/jira/browse/FLINK-4579
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>
> Right now, we only have a {{StateBackendFactory}} for the {{FsStateBackend}} 
> which means that users cannot specify to use the RocksDB backend in the flink 
> configuration.
> If we add a factory for rocksdb we should also think about adding the rocksdb 
> backend to the standard distribution lib, otherwise it is only usable if 
> users manually place the rocks jars in the Flink lib folder.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465023#comment-15465023
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77523806
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 ---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+   private static TestingRpcService testRpcService;
+
+   @BeforeClass
+   public static void beforeClass() {
+   testRpcService = new TestingRpcService();
+
+   }
+
+   @AfterClass
+   public static void afterClass() {
+   testRpcService.stopService();
+   testRpcService = null;
+   }
+
+   @Before
+   public void beforeTest(){
+   testRpcService.clearGateways();
+   }
+
+   /**
+* Tests whether
+* 1) SlotRequest is routed to the SlotManager
+* 2) SlotRequest leads to a container allocation
+* 3) SlotRequest is confirmed
+* 4) Slot becomes available and TaskExecutor gets a SlotRequest
+*/
+   @Test
+   public void testSlotsUnavailableRequest() throws Exception {
+   final String rmAddress = "/rm1";
+   final String jmAddress = "/jm1";
+   final JobID jobID = new JobID();
+
+   testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+   TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+   ResourceManager resourceManager =
+   new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+   resourceManager.start();
+
+   Future registrationFuture =
+   resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, 

[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465028#comment-15465028
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77523985
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 ---
@@ -32,4 +33,11 @@
// 

 
void notifyOfNewResourceManagerLeader(String address, UUID 
resourceManagerLeaderId);
+
+   /**
+* Send by the ResourceManager to the TaskExecutor
+* @param allocationID id for the request
+* @param resourceManagerLeaderID current leader id of the 
ResourceManager
+*/
+   void requestSlot(AllocationID allocationID, UUID 
resourceManagerLeaderID);
--- End diff --

How is the confirmation of the `TaskExecutor` sent back to the 
`SlotManager`? Would it make sense to send it back via the return value of this 
method?


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77523985
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 ---
@@ -32,4 +33,11 @@
// 

 
void notifyOfNewResourceManagerLeader(String address, UUID 
resourceManagerLeaderId);
+
+   /**
+* Send by the ResourceManager to the TaskExecutor
+* @param allocationID id for the request
+* @param resourceManagerLeaderID current leader id of the 
ResourceManager
+*/
+   void requestSlot(AllocationID allocationID, UUID 
resourceManagerLeaderID);
--- End diff --

How is the confirmation of the `TaskExecutor` sent back to the 
`SlotManager`? Would it make sense to send it back via the return value of this 
method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465100#comment-15465100
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77527444
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -35,109 +46,111 @@
final Map taskManagers = new 
HashMap<>();
final Map jobs = new HashMap<>();
 
-   /**
-* Adds a metric to this MetricStore.
-*
-* @param name  the metric identifier
-* @param value the metric value
-*/
-   public void add(String name, Object value) {
-   TaskManagerMetricStore tm;
-   JobMetricStore job;
-   TaskMetricStore task;
-
+   public void add(MetricDump metric) {
try {
-   String[] components = name.split(":");
-   switch (components[0]) {
-   /**
-* JobManagerMetricStore metric
-* format: 0:.
-*/
-   case "0":
-   jobManager.metrics.put(components[1], 
value);
-   break;
-   /**
-* TaskManager metric
-* format: 1::.
-*/
-   case "1":
-   if (components.length != 3) {
-   break;
-   }
-   tm = taskManagers.get(components[1]);
+   QueryScopeInfo info = metric.scopeInfo;
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   String name = info.scope.isEmpty()
+   ? metric.name
+   : info.scope + "." + metric.name;
+   
+   if (name.isEmpty()) { // malformed transmission
+   return;
+   }
+
+   switch (info.getCategory()) {
+   case INFO_CATEGORY_JM:
--- End diff --

What's the benefit of having an explicit type field over using 
`instanceof`? I think encoding the type via the actual type has the advantage 
that you don't mix up classes with wrong category types.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77527444
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -35,109 +46,111 @@
final Map taskManagers = new 
HashMap<>();
final Map jobs = new HashMap<>();
 
-   /**
-* Adds a metric to this MetricStore.
-*
-* @param name  the metric identifier
-* @param value the metric value
-*/
-   public void add(String name, Object value) {
-   TaskManagerMetricStore tm;
-   JobMetricStore job;
-   TaskMetricStore task;
-
+   public void add(MetricDump metric) {
try {
-   String[] components = name.split(":");
-   switch (components[0]) {
-   /**
-* JobManagerMetricStore metric
-* format: 0:.
-*/
-   case "0":
-   jobManager.metrics.put(components[1], 
value);
-   break;
-   /**
-* TaskManager metric
-* format: 1::.
-*/
-   case "1":
-   if (components.length != 3) {
-   break;
-   }
-   tm = taskManagers.get(components[1]);
+   QueryScopeInfo info = metric.scopeInfo;
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   String name = info.scope.isEmpty()
+   ? metric.name
+   : info.scope + "." + metric.name;
+   
+   if (name.isEmpty()) { // malformed transmission
+   return;
+   }
+
+   switch (info.getCategory()) {
+   case INFO_CATEGORY_JM:
--- End diff --

What's the benefit of having an explicit type field over using 
`instanceof`? I think encoding the type via the actual type has the advantage 
that you don't mix up classes with wrong category types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465103#comment-15465103
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77527637
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -35,109 +46,111 @@
final Map taskManagers = new 
HashMap<>();
final Map jobs = new HashMap<>();
 
-   /**
-* Adds a metric to this MetricStore.
-*
-* @param name  the metric identifier
-* @param value the metric value
-*/
-   public void add(String name, Object value) {
-   TaskManagerMetricStore tm;
-   JobMetricStore job;
-   TaskMetricStore task;
-
+   public void add(MetricDump metric) {
try {
-   String[] components = name.split(":");
-   switch (components[0]) {
-   /**
-* JobManagerMetricStore metric
-* format: 0:.
-*/
-   case "0":
-   jobManager.metrics.put(components[1], 
value);
-   break;
-   /**
-* TaskManager metric
-* format: 1::.
-*/
-   case "1":
-   if (components.length != 3) {
-   break;
-   }
-   tm = taskManagers.get(components[1]);
+   QueryScopeInfo info = metric.scopeInfo;
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   String name = info.scope.isEmpty()
+   ? metric.name
+   : info.scope + "." + metric.name;
+   
+   if (name.isEmpty()) { // malformed transmission
+   return;
+   }
+
+   switch (info.getCategory()) {
+   case INFO_CATEGORY_JM:
+   addMetric(jobManager.metrics, name, 
metric);
+   case INFO_CATEGORY_TM:
+   String tmID = 
((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
+   tm = taskManagers.get(tmID);
if (tm == null) {
tm = new 
TaskManagerMetricStore();
-   taskManagers.put(components[1], 
tm);
+   taskManagers.put(tmID, tm);
}
-   tm.metrics.put(components[2], value);
+   addMetric(tm.metrics, name, metric);
break;
-   /**
-* Job metric
-* format: 2::.
-*/
-   case "2":
-   if (components.length != 3) {
-   break;
-   }
-   job = jobs.get(components[1]);
+   case INFO_CATEGORY_JOB:
+   QueryScopeInfo.JobQueryScopeInfo 
jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
+   job = jobs.get(jobInfo.jobID);
if (job == null) {
job = new JobMetricStore();
-   jobs.put(components[1], job);
+   jobs.put(jobInfo.jobID, job);
}
-   job.metrics.put(components[2], value);
+   addMetric(job.metrics, name, metric);
break;
-   /**
-* Task metric
-* format: 
3.
-*
-* As the 

[jira] [Resolved] (FLINK-4479) Replace trademark (tm) with registered trademark (R) sign on Flink website

2016-09-05 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-4479.
---
Resolution: Fixed

Resolved with 
https://github.com/apache/flink-web/commit/f2053d28b12390539275af8d3a6e14941baeba98

> Replace trademark (tm) with registered trademark (R) sign on Flink website
> --
>
> Key: FLINK-4479
> URL: https://issues.apache.org/jira/browse/FLINK-4479
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>
> Flink is now a registered trademark, so we should reflect that on our website.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465265#comment-15465265
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77538038
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -35,109 +46,111 @@
final Map taskManagers = new 
HashMap<>();
final Map jobs = new HashMap<>();
 
-   /**
-* Adds a metric to this MetricStore.
-*
-* @param name  the metric identifier
-* @param value the metric value
-*/
-   public void add(String name, Object value) {
-   TaskManagerMetricStore tm;
-   JobMetricStore job;
-   TaskMetricStore task;
-
+   public void add(MetricDump metric) {
try {
-   String[] components = name.split(":");
-   switch (components[0]) {
-   /**
-* JobManagerMetricStore metric
-* format: 0:.
-*/
-   case "0":
-   jobManager.metrics.put(components[1], 
value);
-   break;
-   /**
-* TaskManager metric
-* format: 1::.
-*/
-   case "1":
-   if (components.length != 3) {
-   break;
-   }
-   tm = taskManagers.get(components[1]);
+   QueryScopeInfo info = metric.scopeInfo;
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   String name = info.scope.isEmpty()
+   ? metric.name
+   : info.scope + "." + metric.name;
+   
+   if (name.isEmpty()) { // malformed transmission
+   return;
+   }
+
+   switch (info.getCategory()) {
+   case INFO_CATEGORY_JM:
--- End diff --

eh, seemed like the proper way of handling it. Also, (up to) 4 comparisons 
vs a jump.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465303#comment-15465303
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77540202
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 ---
@@ -35,109 +46,111 @@
final Map taskManagers = new 
HashMap<>();
final Map jobs = new HashMap<>();
 
-   /**
-* Adds a metric to this MetricStore.
-*
-* @param name  the metric identifier
-* @param value the metric value
-*/
-   public void add(String name, Object value) {
-   TaskManagerMetricStore tm;
-   JobMetricStore job;
-   TaskMetricStore task;
-
+   public void add(MetricDump metric) {
try {
-   String[] components = name.split(":");
-   switch (components[0]) {
-   /**
-* JobManagerMetricStore metric
-* format: 0:.
-*/
-   case "0":
-   jobManager.metrics.put(components[1], 
value);
-   break;
-   /**
-* TaskManager metric
-* format: 1::.
-*/
-   case "1":
-   if (components.length != 3) {
-   break;
-   }
-   tm = taskManagers.get(components[1]);
+   QueryScopeInfo info = metric.scopeInfo;
+   TaskManagerMetricStore tm;
+   JobMetricStore job;
+   TaskMetricStore task;
+
+   String name = info.scope.isEmpty()
+   ? metric.name
+   : info.scope + "." + metric.name;
+   
+   if (name.isEmpty()) { // malformed transmission
+   return;
+   }
+
+   switch (info.getCategory()) {
+   case INFO_CATEGORY_JM:
--- End diff --

That is true. Performance-wise it is the more efficient way to execute it, 
no doubt. I was just wondering whether this is not a case of premature 
optimization with the price of harder maintainability.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465037#comment-15465037
 ] 

ASF GitHub Bot commented on FLINK-4538:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2463#discussion_r77524239
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
 ---
@@ -46,13 +47,21 @@
/** if the slot is allocated, jobId identify which job this slot is 
allocated to; else, jobId is null */
private final JobID jobID;
 
-   public SlotStatus(SlotID slotID, ResourceProfile profiler) {
-   this(slotID, profiler, null, null);
+   /** Gateway to the TaskManager which reported the SlotStatus */
+   private final TaskExecutorGateway taskExecutorGateway;
--- End diff --

The `SlotStatus` is no longer serializable with this field. Where does the 
`SlotStatus` come from? If it's coming from the `TaskExecutor`, then the 
`taskExecutorGateway` has to be retrieved on the `ResourceManager` side.


> Implement slot allocation protocol with JobMaster
> -
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465125#comment-15465125
 ] 

ASF GitHub Bot commented on FLINK-4389:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77529056
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
 ---
@@ -25,6 +25,7 @@
public static final byte METRIC_CATEGORY_COUNTER = 0;
public static final byte METRIC_CATEGORY_GAUGE = 1;
public static final byte METRIC_CATEGORY_HISTOGRAM = 2;
+   public static final byte METRIC_CATEGORY_METER = 3;
--- End diff --

Sorry just saw your latest commit.


> Expose metrics to Webfrontend
> -
>
> Key: FLINK-4389
> URL: https://issues.apache.org/jira/browse/FLINK-4389
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics, Webfrontend
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend

2016-09-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2363#discussion_r77529056
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
 ---
@@ -25,6 +25,7 @@
public static final byte METRIC_CATEGORY_COUNTER = 0;
public static final byte METRIC_CATEGORY_GAUGE = 1;
public static final byte METRIC_CATEGORY_HISTOGRAM = 2;
+   public static final byte METRIC_CATEGORY_METER = 3;
--- End diff --

Sorry just saw your latest commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4073) YARNSessionCapacitySchedulerITCase.testTaskManagerFailure failed on Travis

2016-09-05 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-4073.
---
   Resolution: Fixed
 Assignee: Maximilian Michels
Fix Version/s: 1.2.0

Fixed with 2f87f61d34414074bc09ba8584d345bd400ed3cd.

> YARNSessionCapacitySchedulerITCase.testTaskManagerFailure failed on Travis
> --
>
> Key: FLINK-4073
> URL: https://issues.apache.org/jira/browse/FLINK-4073
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Maximilian Michels
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.2.0
>
>
> The test case {{YARNSessionCapacitySchedulerITCase.testTaskManagerFailure}} 
> failed on Travis.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/137498643/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-09-05 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
The Kafka tests fail:

```
Running org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase
org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to 
zookeeper server within timeout: 3
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:155)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:129)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
at kafka.server.KafkaServer.initZk(KafkaServer.scala:278)
at kafka.server.KafkaServer.startup(KafkaServer.scala:168)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:336)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:170)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.prepare(KafkaTestEnvironment.java:41)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:131)
at 
org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase.prepare(Kafka09SecureRunITCase.java:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 47.945 sec 
<<< FAILURE! - in 
org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase
```

The goal should be that all builds pass on Travis before we merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465144#comment-15465144
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
The Kafka tests fail:

```
Running org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase
org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to 
zookeeper server within timeout: 3
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:155)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:129)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71)
at kafka.server.KafkaServer.initZk(KafkaServer.scala:278)
at kafka.server.KafkaServer.startup(KafkaServer.scala:168)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:336)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:170)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.prepare(KafkaTestEnvironment.java:41)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:131)
at 
org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase.prepare(Kafka09SecureRunITCase.java:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 47.945 sec 
<<< FAILURE! - in 
org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase
```

The goal should be that all builds pass on Travis before we merge this.


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465855#comment-15465855
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
@mxm I believe the ZK timeout issue occurs from 
LocalFlinkMiniClusterITCase->testLocalFlinkMiniClusterWithMultipleTaskManagers 
test case but it is not consistent. I ran the Kafka test case alone and it 
worked. I also ran "mvn clean verify" and I don't see any errors (after couple 
of retry - same ZK timeout error from LocalFlinkMiniClusterITCase) . It looks 
like there is some inconsistency in some of the integration test scenarios.


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >