[GitHub] yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-12 Thread GitBox
yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report 
the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-429502056
 
 
   @tillrohrmann rebased and all checks have passed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648683#comment-16648683
 ] 

ASF GitHub Bot commented on FLINK-10135:


yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report 
the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#issuecomment-429502056
 
 
   @tillrohrmann rebased and all checks have passed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-12 Thread Shuyi Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648665#comment-16648665
 ] 

Shuyi Chen commented on FLINK-10516:


[~till.rohrmann], [~aljoscha], do you want me to cherry pick the change onto 
branch release-1.5 and release-1.6? Thanks.

> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-12 Thread Shuyi Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648664#comment-16648664
 ] 

Shuyi Chen commented on FLINK-10516:


This is fixed in 1.7.0 with 5e90ed95a580aefd84b72f593954d01f4eb67f68.

> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648660#comment-16648660
 ] 

ASF GitHub Bot commented on FLINK-10516:


asfgit closed pull request #6836: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initiali…
URL: https://github.com/apache/flink/pull/6836
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index e98e174cbdb..c275bfe0a02 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -27,6 +27,7 @@
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -67,6 +68,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -163,6 +165,13 @@ protected int run(String[] args) {
 
final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties, LOG);
 
+   // configure the filesystems
+   try {
+   FileSystem.initialize(flinkConfig);
+   } catch (IOException e) {
+   throw new IOException("Error while configuring 
the filesystems.", e);
+   }
+
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
if (remoteKeytabPrincipal != null && f.exists()) {
String keytabPath = f.getAbsolutePath();


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali…

2018-10-12 Thread GitBox
asfgit closed pull request #6836: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initiali…
URL: https://github.com/apache/flink/pull/6836
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index e98e174cbdb..c275bfe0a02 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -27,6 +27,7 @@
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -67,6 +68,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -163,6 +165,13 @@ protected int run(String[] args) {
 
final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties, LOG);
 
+   // configure the filesystems
+   try {
+   FileSystem.initialize(flinkConfig);
+   } catch (IOException e) {
+   throw new IOException("Error while configuring 
the filesystems.", e);
+   }
+
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
if (remoteKeytabPrincipal != null && f.exists()) {
String keytabPath = f.getAbsolutePath();


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648657#comment-16648657
 ] 

ASF GitHub Bot commented on FLINK-10516:


yanyan300300 commented on issue #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828#issuecomment-429497882
 
 
   Thanks all for reviewing. I closed this PR and opened 
https://github.com/apache/flink/pull/6836 to be against master instead Flink 
1.4, and removed the unit test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanyan300300 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-12 Thread GitBox
yanyan300300 commented on issue #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828#issuecomment-429497882
 
 
   Thanks all for reviewing. I closed this PR and opened 
https://github.com/apache/flink/pull/6836 to be against master instead Flink 
1.4, and removed the unit test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10542) Register Hive metastore as an external catalog in TableEnvironment

2018-10-12 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648654#comment-16648654
 ] 

sunjincheng commented on FLINK-10542:
-

+1, This is a very important work for flink ecological construction. [~xuefuz]

> Register Hive metastore as an external catalog in TableEnvironment
> --
>
> Key: FLINK-10542
> URL: https://issues.apache.org/jira/browse/FLINK-10542
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> Similar to FLINK-2167 but rather register Hive metastore as an external 
> ctalog in the {{TableEnvironment}}. After registration, Table API and SQL 
> queries should be able to access all Hive tables.
> This might supersede the need of FLINK-2167 because Hive metastore stores a 
> superset of tables available via hCat without an indirection.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10542) Register Hive metastore as an external catalog in TableEnvironment

2018-10-12 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created FLINK-10542:
---

 Summary: Register Hive metastore as an external catalog in 
TableEnvironment
 Key: FLINK-10542
 URL: https://issues.apache.org/jira/browse/FLINK-10542
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Affects Versions: 1.6.0
Reporter: Xuefu Zhang
Assignee: Xuefu Zhang


Similar to FLINK-2167 but rather register Hive metastore as an external ctalog 
in the {{TableEnvironment}}. After registration, Table API and SQL queries 
should be able to access all Hive tables.

This might supersede the need of FLINK-2167 because Hive metastore stores a 
superset of tables available via hCat without an indirection.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] suez1224 commented on issue #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali…

2018-10-12 Thread GitBox
suez1224 commented on issue #6836: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initiali…
URL: https://github.com/apache/flink/pull/6836#issuecomment-429481759
 
 
   LGTM. Will merge after the tests run through.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648544#comment-16648544
 ] 

ASF GitHub Bot commented on FLINK-10516:


suez1224 commented on issue #6836: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initiali…
URL: https://github.com/apache/flink/pull/6836#issuecomment-429481759
 
 
   LGTM. Will merge after the tests run through.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6036) Let catalog support partition

2018-10-12 Thread Xuefu Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648525#comment-16648525
 ] 

Xuefu Zhang commented on FLINK-6036:


Hi [~jinyu.zj], are you interested in moving this forward? This seems to be a 
prerequisite of the Flink-Hive integration that were in discussion in the 
community. Thanks.

> Let catalog support partition
> -
>
> Key: FLINK-6036
> URL: https://issues.apache.org/jira/browse/FLINK-6036
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: jingzhang
>Assignee: jingzhang
>Priority: Major
>
> Now catalog only support CRUD at database and table level. But in some kind 
> of catalog, for example for hive, we also need do CRUD operations at 
> partition level. 
> This issue aims to let catalog support partition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanyan300300 opened a new pull request #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali…

2018-10-12 Thread GitBox
yanyan300300 opened a new pull request #6836: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initiali…
URL: https://github.com/apache/flink/pull/6836
 
 
   ## What is the purpose of the change
   
   This pull request makes YarnApplicationMasterRunner explicitly initialize 
FileSystem with Flink Configuration, so that the HadoopFileSystem can take in 
the custom HDFS configuration (e.g. core-site.xml under fs.hdfs.hadoopconf).
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648496#comment-16648496
 ] 

ASF GitHub Bot commented on FLINK-10516:


yanyan300300 opened a new pull request #6836: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initiali…
URL: https://github.com/apache/flink/pull/6836
 
 
   ## What is the purpose of the change
   
   This pull request makes YarnApplicationMasterRunner explicitly initialize 
FileSystem with Flink Configuration, so that the HadoopFileSystem can take in 
the custom HDFS configuration (e.g. core-site.xml under fs.hdfs.hadoopconf).
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648492#comment-16648492
 ] 

ASF GitHub Bot commented on FLINK-10516:


suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828#issuecomment-429471339
 
 
   @yanyan300300 , I am fine with adding just the fix w/o the unittest given 
the code will be deprecated in 1.7 in 
[FLINK-10392]([FLINK-10392|https://issues.apache.org/jira/browse/FLINK-10392]).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
> Configuration during setup
> ---
>
> Key: FLINK-10516
> URL: https://issues.apache.org/jira/browse/FLINK-10516
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to 
> prevent future regression.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-12 Thread GitBox
suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828#issuecomment-429471339
 
 
   @yanyan300300 , I am fine with adding just the fix w/o the unittest given 
the code will be deprecated in 1.7 in 
[FLINK-10392]([FLINK-10392|https://issues.apache.org/jira/browse/FLINK-10392]).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-12 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-10156.
-
   Resolution: Done
Fix Version/s: 1.7.0

Done with d1a03dd239555298da9ac9be4ea94ccf52d9887b

> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10528) Remove deprecated APIs from Table API for Flink 1.7.0

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648479#comment-16648479
 ] 

ASF GitHub Bot commented on FLINK-10528:


asfgit closed pull request #6826: [FLINK-10528] [table] Remove methods that 
were deprecated in Flink 1.4.0
URL: https://github.com/apache/flink/pull/6826
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index a3b7093dd25..3767e5a1a99 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -457,7 +457,7 @@ public void testCassandraTableSink() throws Exception {
 
tEnv.registerDataStreamInternal("testFlinkTable", source);
 
-   tEnv.sql("select * from testFlinkTable").writeToSink(
+   tEnv.sqlQuery("select * from testFlinkTable").writeToSink(
new CassandraAppendTableSink(builder, 
injectTableName(INSERT_DATA_QUERY)));
 
env.execute();
diff --git 
a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index 27b75d49ac9..4bb0f31978c 100644
--- 
a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -51,7 +51,7 @@
  * hSrc.addColumn("fam2", "col1", String.class);
  *
  * tableEnv.registerTableSource("hTable", hSrc);
- * Table res = tableEnv.sql("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable 
AS t GROUP BY t.fam2.col1");
+ * Table res = tableEnv.sqlQuery("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM 
hTable AS t GROUP BY t.fam2.col1");
  * }
  * 
  *
diff --git 
a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 
b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
index 860999c80db..6e3ada4c493 100644
--- 
a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
+++ 
b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
@@ -75,7 +75,7 @@
  *   .build();
  *
  * tEnv.registerTableSource("orcTable", orcSrc);
- * Table res = tableEnv.sql("SELECT * FROM orcTable");
+ * Table res = tableEnv.sqlQuery("SELECT * FROM orcTable");
  * }
  * 
  */
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d740c3f1f99..6fe0a9e3a64 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -656,30 +656,6 @@ abstract class TableEnvironment(val config: TableConfig) {
 */
   def explain(table: Table): String
 
-  /**
-* Evaluates a SQL query on registered tables and retrieves the result as a 
[[Table]].
-*
-* All tables referenced by the query must be registered in the 
TableEnvironment.
-* A [[Table]] is automatically registered when its [[toString]] method is 
called, for example
-* when it is embedded into a String.
-* Hence, SQL queries can directly reference a [[Table]] as follows:
-*
-* {{{
-*   val table: Table = ...
-*   // the table is not registered to the table environment
-*   tEnv.sql(s"SELECT * FROM $table")
-* }}}
-*
-* @deprecated Use sqlQuery() instead.
-* @param query The SQL query to evaluate.
-* @return The result of the query as Table.
-*/
-  @Deprecated
-  @deprecated("Please use sqlQuery() instead.")
-  def sql(query: String): Table = {
-sqlQuery(query)
-  }
-
   /**
 * Evaluates a SQL query on registered tables and retrieves the result as a 
[[Table]].
 *
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index 2da08faa12e..53fb7a08d47 100644
--- 

[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648478#comment-16648478
 ] 

ASF GitHub Bot commented on FLINK-10156:


asfgit closed pull request #6805: [FLINK-10156][table] Deprecate 
Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index 146d1a6c1fd..622fe9ffedf 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -46,6 +46,8 @@ StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
 tableEnv.registerTable("table1", ...)// or
 tableEnv.registerTableSource("table2", ...); // or
 tableEnv.registerExternalCatalog("extCat", ...);
+// register an output Table
+tableEnv.registerTableSink("outputTable", ...);
 
 // create a Table from a Table API query
 Table tapiResult = tableEnv.scan("table1").select(...);
@@ -53,7 +55,7 @@ Table tapiResult = tableEnv.scan("table1").select(...);
 Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.writeToSink(...);
+tapiResult.insertInto("outputTable");
 
 // execute
 env.execute();
@@ -72,7 +74,9 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
 // register a Table
 tableEnv.registerTable("table1", ...)   // or
 tableEnv.registerTableSource("table2", ...) // or
-tableEnv.registerExternalCatalog("extCat", ...) 
+tableEnv.registerExternalCatalog("extCat", ...)
+// register an output Table
+tableEnv.registerTableSink("outputTable", ...);
 
 // create a Table from a Table API query
 val tapiResult = tableEnv.scan("table1").select(...)
@@ -80,7 +84,7 @@ val tapiResult = tableEnv.scan("table1").select(...)
 val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.writeToSink(...)
+tapiResult.insertInto("outputTable")
 
 // execute
 env.execute()
@@ -500,10 +504,7 @@ A batch `Table` can only be written to a `BatchTableSink`, 
while a streaming `Ta
 
 Please see the documentation about [Table Sources & Sinks]({{ site.baseurl 
}}/dev/table/sourceSinks.html) for details about available sinks and 
instructions for how to implement a custom `TableSink`.
 
-There are two ways to emit a table:
-
-1. The `Table.writeToSink(TableSink sink)` method emits the table using the 
provided `TableSink` and automatically configures the sink with the schema of 
the table to emit.
-2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that 
was registered with a specific schema under the provided name in the 
`TableEnvironment`'s catalog. The schema of the table to emit is validated 
against the schema of the registered `TableSink`.
+The `Table.insertInto(String tableName)` method emits the `Table` to a 
registered `TableSink`. The method looks up the `TableSink` from the catalog by 
the name and validates that the schema of the `Table` is identical to the 
schema of the `TableSink`. 
 
 The following examples shows how to emit a `Table`:
 
@@ -513,22 +514,17 @@ The following examples shows how to emit a `Table`:
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
 StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
-// compute a result Table using Table API operators and/or SQL queries
-Table result = ...
-
 // create a TableSink
 TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
 
-// METHOD 1:
-//   Emit the result Table to the TableSink via the writeToSink() method
-result.writeToSink(sink);
-
-// METHOD 2:
-//   Register the TableSink with a specific schema
+// register the TableSink with a specific schema
 String[] fieldNames = {"a", "b", "c"};
 TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
 tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
-//   Emit the result Table to the registered TableSink via the insertInto() 
method
+
+// compute a result Table using Table API operators and/or SQL queries
+Table result = ...
+// emit the result Table to the registered TableSink
 result.insertInto("CsvSinkTable");
 
 // execute the program
@@ -540,22 +536,18 @@ result.insertInto("CsvSinkTable");
 // get a TableEnvironment
 val tableEnv = TableEnvironment.getTableEnvironment(env)
 
-// compute a result Table using Table API operators and/or SQL queries
-val result: Table = ...
-
 // create a TableSink
 val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
 
-// METHOD 1:
-//   Emit the result Table to 

[jira] [Closed] (FLINK-10528) Remove deprecated APIs from Table API for Flink 1.7.0

2018-10-12 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-10528.
-
   Resolution: Done
Fix Version/s: 1.7.0

Done with 5e90ed95a580aefd84b72f593954d01f4eb67f68

> Remove deprecated APIs from Table API for Flink 1.7.0
> -
>
> Key: FLINK-10528
> URL: https://issues.apache.org/jira/browse/FLINK-10528
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.7.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> There are a few APIs that have been deprecated for a while (since Flink 
> 1.4.0) and that could be removed:
>  * {{TableEnvironment.sql()}} as of FLINK-6442
>  * {{StreamTableElnvironment.toDataStream()}} and 
> {{TableConversions.toDataStream()}} as of FLINK-6543
>  * {{Table.limit()}} as of FLINK-7821



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6826: [FLINK-10528] [table] Remove methods that were deprecated in Flink 1.4.0

2018-10-12 Thread GitBox
asfgit closed pull request #6826: [FLINK-10528] [table] Remove methods that 
were deprecated in Flink 1.4.0
URL: https://github.com/apache/flink/pull/6826
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index a3b7093dd25..3767e5a1a99 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -457,7 +457,7 @@ public void testCassandraTableSink() throws Exception {
 
tEnv.registerDataStreamInternal("testFlinkTable", source);
 
-   tEnv.sql("select * from testFlinkTable").writeToSink(
+   tEnv.sqlQuery("select * from testFlinkTable").writeToSink(
new CassandraAppendTableSink(builder, 
injectTableName(INSERT_DATA_QUERY)));
 
env.execute();
diff --git 
a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index 27b75d49ac9..4bb0f31978c 100644
--- 
a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -51,7 +51,7 @@
  * hSrc.addColumn("fam2", "col1", String.class);
  *
  * tableEnv.registerTableSource("hTable", hSrc);
- * Table res = tableEnv.sql("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable 
AS t GROUP BY t.fam2.col1");
+ * Table res = tableEnv.sqlQuery("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM 
hTable AS t GROUP BY t.fam2.col1");
  * }
  * 
  *
diff --git 
a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 
b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
index 860999c80db..6e3ada4c493 100644
--- 
a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
+++ 
b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
@@ -75,7 +75,7 @@
  *   .build();
  *
  * tEnv.registerTableSource("orcTable", orcSrc);
- * Table res = tableEnv.sql("SELECT * FROM orcTable");
+ * Table res = tableEnv.sqlQuery("SELECT * FROM orcTable");
  * }
  * 
  */
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index d740c3f1f99..6fe0a9e3a64 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -656,30 +656,6 @@ abstract class TableEnvironment(val config: TableConfig) {
 */
   def explain(table: Table): String
 
-  /**
-* Evaluates a SQL query on registered tables and retrieves the result as a 
[[Table]].
-*
-* All tables referenced by the query must be registered in the 
TableEnvironment.
-* A [[Table]] is automatically registered when its [[toString]] method is 
called, for example
-* when it is embedded into a String.
-* Hence, SQL queries can directly reference a [[Table]] as follows:
-*
-* {{{
-*   val table: Table = ...
-*   // the table is not registered to the table environment
-*   tEnv.sql(s"SELECT * FROM $table")
-* }}}
-*
-* @deprecated Use sqlQuery() instead.
-* @param query The SQL query to evaluate.
-* @return The result of the query as Table.
-*/
-  @Deprecated
-  @deprecated("Please use sqlQuery() instead.")
-  def sql(query: String): Table = {
-sqlQuery(query)
-  }
-
   /**
 * Evaluates a SQL query on registered tables and retrieves the result as a 
[[Table]].
 *
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
index 2da08faa12e..53fb7a08d47 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala
@@ -133,115 +133,6 @@ class StreamTableEnvironment(
 

[GitHub] asfgit closed pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-12 Thread GitBox
asfgit closed pull request #6805: [FLINK-10156][table] Deprecate 
Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index 146d1a6c1fd..622fe9ffedf 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -46,6 +46,8 @@ StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
 tableEnv.registerTable("table1", ...)// or
 tableEnv.registerTableSource("table2", ...); // or
 tableEnv.registerExternalCatalog("extCat", ...);
+// register an output Table
+tableEnv.registerTableSink("outputTable", ...);
 
 // create a Table from a Table API query
 Table tapiResult = tableEnv.scan("table1").select(...);
@@ -53,7 +55,7 @@ Table tapiResult = tableEnv.scan("table1").select(...);
 Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.writeToSink(...);
+tapiResult.insertInto("outputTable");
 
 // execute
 env.execute();
@@ -72,7 +74,9 @@ val tableEnv = TableEnvironment.getTableEnvironment(env)
 // register a Table
 tableEnv.registerTable("table1", ...)   // or
 tableEnv.registerTableSource("table2", ...) // or
-tableEnv.registerExternalCatalog("extCat", ...) 
+tableEnv.registerExternalCatalog("extCat", ...)
+// register an output Table
+tableEnv.registerTableSink("outputTable", ...);
 
 // create a Table from a Table API query
 val tapiResult = tableEnv.scan("table1").select(...)
@@ -80,7 +84,7 @@ val tapiResult = tableEnv.scan("table1").select(...)
 val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.writeToSink(...)
+tapiResult.insertInto("outputTable")
 
 // execute
 env.execute()
@@ -500,10 +504,7 @@ A batch `Table` can only be written to a `BatchTableSink`, 
while a streaming `Ta
 
 Please see the documentation about [Table Sources & Sinks]({{ site.baseurl 
}}/dev/table/sourceSinks.html) for details about available sinks and 
instructions for how to implement a custom `TableSink`.
 
-There are two ways to emit a table:
-
-1. The `Table.writeToSink(TableSink sink)` method emits the table using the 
provided `TableSink` and automatically configures the sink with the schema of 
the table to emit.
-2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that 
was registered with a specific schema under the provided name in the 
`TableEnvironment`'s catalog. The schema of the table to emit is validated 
against the schema of the registered `TableSink`.
+The `Table.insertInto(String tableName)` method emits the `Table` to a 
registered `TableSink`. The method looks up the `TableSink` from the catalog by 
the name and validates that the schema of the `Table` is identical to the 
schema of the `TableSink`. 
 
 The following examples shows how to emit a `Table`:
 
@@ -513,22 +514,17 @@ The following examples shows how to emit a `Table`:
 // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
 StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
-// compute a result Table using Table API operators and/or SQL queries
-Table result = ...
-
 // create a TableSink
 TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
 
-// METHOD 1:
-//   Emit the result Table to the TableSink via the writeToSink() method
-result.writeToSink(sink);
-
-// METHOD 2:
-//   Register the TableSink with a specific schema
+// register the TableSink with a specific schema
 String[] fieldNames = {"a", "b", "c"};
 TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
 tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
-//   Emit the result Table to the registered TableSink via the insertInto() 
method
+
+// compute a result Table using Table API operators and/or SQL queries
+Table result = ...
+// emit the result Table to the registered TableSink
 result.insertInto("CsvSinkTable");
 
 // execute the program
@@ -540,22 +536,18 @@ result.insertInto("CsvSinkTable");
 // get a TableEnvironment
 val tableEnv = TableEnvironment.getTableEnvironment(env)
 
-// compute a result Table using Table API operators and/or SQL queries
-val result: Table = ...
-
 // create a TableSink
 val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
 
-// METHOD 1:
-//   Emit the result Table to the TableSink via the writeToSink() method
-result.writeToSink(sink)
-
-// METHOD 2:
-//   Register the TableSink with a specific schema
+// register the TableSink with a specific schema
 val fieldNames: Array[String] = Array("a", "b", "c")
 

[jira] [Resolved] (FLINK-10532) Broken links in documentation

2018-10-12 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-10532.
--
   Resolution: Fixed
Fix Version/s: 1.6.2

Fixed in 1.7.0: 383cf887f9936d53d37ba907e03522fb8c88a67d
Fixed in 1.6.2: 8601555896b2016804303ce7baa6a8b49dd3b07e

> Broken links in documentation
> -
>
> Key: FLINK-10532
> URL: https://issues.apache.org/jira/browse/FLINK-10532
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.7.0, 1.6.2
>
>
> https://travis-ci.org/apache/flink/builds/440115490#L599
> {code:java}
> http://localhost:4000/dev/stream/operators.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/dev/table/streaming/sql.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/dev/table/streaming.html:
> Remote file does not exist -- broken link!!!{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648401#comment-16648401
 ] 

ASF GitHub Bot commented on FLINK-10156:


fhueske commented on issue #6805: [FLINK-10156][table] Deprecate 
Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#issuecomment-429448411
 
 
   merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10528) Remove deprecated APIs from Table API for Flink 1.7.0

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648402#comment-16648402
 ] 

ASF GitHub Bot commented on FLINK-10528:


fhueske commented on issue #6826: [FLINK-10528] [table] Remove methods that 
were deprecated in Flink 1.4.0
URL: https://github.com/apache/flink/pull/6826#issuecomment-429448476
 
 
   merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove deprecated APIs from Table API for Flink 1.7.0
> -
>
> Key: FLINK-10528
> URL: https://issues.apache.org/jira/browse/FLINK-10528
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.7.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>  Labels: pull-request-available
>
> There are a few APIs that have been deprecated for a while (since Flink 
> 1.4.0) and that could be removed:
>  * {{TableEnvironment.sql()}} as of FLINK-6442
>  * {{StreamTableElnvironment.toDataStream()}} and 
> {{TableConversions.toDataStream()}} as of FLINK-6543
>  * {{Table.limit()}} as of FLINK-7821



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-12 Thread GitBox
fhueske commented on issue #6805: [FLINK-10156][table] Deprecate 
Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#issuecomment-429448411
 
 
   merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #6826: [FLINK-10528] [table] Remove methods that were deprecated in Flink 1.4.0

2018-10-12 Thread GitBox
fhueske commented on issue #6826: [FLINK-10528] [table] Remove methods that 
were deprecated in Flink 1.4.0
URL: https://github.com/apache/flink/pull/6826#issuecomment-429448476
 
 
   merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10541) Removed unused code depends on LocalFlinkMiniCluster

2018-10-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10541:
---
Labels: pull-request-available  (was: )

> Removed unused code depends on LocalFlinkMiniCluster
> 
>
> Key: FLINK-10541
> URL: https://issues.apache.org/jira/browse/FLINK-10541
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{TestBaseUtils#startCluster}} depends on legacy class 
> {{LocalFlinkMiniCluster}}, however, this class itself is unused. Let's remove 
> it and help confirm we can directly remove {{LocalFlinkMiniCluster}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun opened a new pull request #6835: [FLINK-10541] [tests] Removed unused code depends on LocalFlinkMiniCl…

2018-10-12 Thread GitBox
TisonKun opened a new pull request #6835: [FLINK-10541] [tests] Removed unused 
code depends on LocalFlinkMiniCl…
URL: https://github.com/apache/flink/pull/6835
 
 
   …uster
   
   
   
   ## What is the purpose of the change
   
   `TestBaseUtils#startCluster` and `TestBaseUtils#stopCluster` depends on 
legacy class `LocalFlinkMiniCluster`, however, these method are unused. Let's 
remove them and help confirm we can directly remove LocalFlinkMiniCluster.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**no**)
   
   cc @tillrohrmann @zentol 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10541) Removed unused code depends on LocalFlinkMiniCluster

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648399#comment-16648399
 ] 

ASF GitHub Bot commented on FLINK-10541:


TisonKun opened a new pull request #6835: [FLINK-10541] [tests] Removed unused 
code depends on LocalFlinkMiniCl…
URL: https://github.com/apache/flink/pull/6835
 
 
   …uster
   
   
   
   ## What is the purpose of the change
   
   `TestBaseUtils#startCluster` and `TestBaseUtils#stopCluster` depends on 
legacy class `LocalFlinkMiniCluster`, however, these method are unused. Let's 
remove them and help confirm we can directly remove LocalFlinkMiniCluster.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**no**)
   
   cc @tillrohrmann @zentol 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Removed unused code depends on LocalFlinkMiniCluster
> 
>
> Key: FLINK-10541
> URL: https://issues.apache.org/jira/browse/FLINK-10541
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{TestBaseUtils#startCluster}} depends on legacy class 
> {{LocalFlinkMiniCluster}}, however, this class itself is unused. Let's remove 
> it and help confirm we can directly remove {{LocalFlinkMiniCluster}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10541) Removed unused code depends on LocalFlinkMiniCluster

2018-10-12 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-10541:
-
Summary: Removed unused code depends on LocalFlinkMiniCluster  (was: 
Removed unused code of LocalFlinkMiniCluster)

> Removed unused code depends on LocalFlinkMiniCluster
> 
>
> Key: FLINK-10541
> URL: https://issues.apache.org/jira/browse/FLINK-10541
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.7.0
>
>
> {{TestBaseUtils#startCluster}} depends on legacy class 
> {{LocalFlinkMiniCluster}}, however, this class itself is unused. Let's remove 
> it and help confirm we can directly remove {{LocalFlinkMiniCluster}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10541) Removed unused code of LocalFlinkMiniCluster

2018-10-12 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-10541:
-
Issue Type: Sub-task  (was: Task)
Parent: FLINK-10392

> Removed unused code of LocalFlinkMiniCluster
> 
>
> Key: FLINK-10541
> URL: https://issues.apache.org/jira/browse/FLINK-10541
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
> Fix For: 1.7.0
>
>
> {{TestBaseUtils#startCluster}} depends on legacy class 
> {{LocalFlinkMiniCluster}}, however, this class itself is unused. Let's remove 
> it and help confirm we can directly remove {{LocalFlinkMiniCluster}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10541) Removed unused code of LocalFlinkMiniCluster

2018-10-12 Thread TisonKun (JIRA)
TisonKun created FLINK-10541:


 Summary: Removed unused code of LocalFlinkMiniCluster
 Key: FLINK-10541
 URL: https://issues.apache.org/jira/browse/FLINK-10541
 Project: Flink
  Issue Type: Task
  Components: Tests
Affects Versions: 1.7.0
Reporter: TisonKun
Assignee: TisonKun
 Fix For: 1.7.0


{{TestBaseUtils#startCluster}} depends on legacy class 
{{LocalFlinkMiniCluster}}, however, this class itself is unused. Let's remove 
it and help confirm we can directly remove {{LocalFlinkMiniCluster}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10540) Remove legacy LocalFlinkMiniCluster

2018-10-12 Thread TisonKun (JIRA)
TisonKun created FLINK-10540:


 Summary: Remove legacy LocalFlinkMiniCluster
 Key: FLINK-10540
 URL: https://issues.apache.org/jira/browse/FLINK-10540
 Project: Flink
  Issue Type: Sub-task
Affects Versions: 1.7.0
Reporter: TisonKun
 Fix For: 1.7.0


{{LocalFlinkMiniCluster}} is based on legacy cluster mode and should be no 
longer used.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648353#comment-16648353
 ] 

ASF GitHub Bot commented on FLINK-10156:


fhueske commented on issue #6805: [FLINK-10156][table] Deprecate 
Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#issuecomment-429437884
 
 
   Thanks for the review @sunjincheng121!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-12 Thread GitBox
fhueske commented on issue #6805: [FLINK-10156][table] Deprecate 
Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#issuecomment-429437884
 
 
   Thanks for the review @sunjincheng121!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10508) Port JobManagerITCase to new code base

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648350#comment-16648350
 ] 

ASF GitHub Bot commented on FLINK-10508:


TisonKun commented on issue #6834: [FLINK-10508] [tests] Port JobManagerITCase 
to new code base
URL: https://github.com/apache/flink/pull/6834#issuecomment-429436854
 
 
   After some commits above, `JobManagerITCase` remains 4 tests which I am not 
quite sure how to process them. Thus I'd like to cc @tillrohrmann and @tzulitai 
for advice.
   
   1. `"JobManagerITCase.The JobManager actor must remove execution graphs when 
the client ends the session explicitly"` and `"JobManagerITCase.The JobManager 
actor must remove execution graphs when when the client's session times out"`, 
since `sessionTimeout` looks like not be used anymore in FLIP-6 codebase, and 
execution graph archive mechanisms different, I tend to remove them. 
@tillrohrmann 
   
   2. `"JobManagerITCase.The JobManager actor must handle trigger savepoint 
response after trigger savepoint failure"` and `"JobManagerITCase.The 
JobManager actor must handle failed savepoint triggering"`, these two tests 
check "message" of trigger savepoint fail. Since FLIP-6 is not directly based 
on Akka, I don't think it is necessary for these checks, so tend to remove 
them. However, I want to know if we have tests to cover negative cases of 
savepoint @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port JobManagerITCase to new code base
> --
>
> Key: FLINK-10508
> URL: https://issues.apache.org/jira/browse/FLINK-10508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{JobManagerITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6834: [FLINK-10508] [tests] Port JobManagerITCase to new code base

2018-10-12 Thread GitBox
TisonKun commented on issue #6834: [FLINK-10508] [tests] Port JobManagerITCase 
to new code base
URL: https://github.com/apache/flink/pull/6834#issuecomment-429436854
 
 
   After some commits above, `JobManagerITCase` remains 4 tests which I am not 
quite sure how to process them. Thus I'd like to cc @tillrohrmann and @tzulitai 
for advice.
   
   1. `"JobManagerITCase.The JobManager actor must remove execution graphs when 
the client ends the session explicitly"` and `"JobManagerITCase.The JobManager 
actor must remove execution graphs when when the client's session times out"`, 
since `sessionTimeout` looks like not be used anymore in FLIP-6 codebase, and 
execution graph archive mechanisms different, I tend to remove them. 
@tillrohrmann 
   
   2. `"JobManagerITCase.The JobManager actor must handle trigger savepoint 
response after trigger savepoint failure"` and `"JobManagerITCase.The 
JobManager actor must handle failed savepoint triggering"`, these two tests 
check "message" of trigger savepoint fail. Since FLIP-6 is not directly based 
on Akka, I don't think it is necessary for these checks, so tend to remove 
them. However, I want to know if we have tests to cover negative cases of 
savepoint @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10532) Broken links in documentation

2018-10-12 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-10532:


Assignee: Timo Walther

> Broken links in documentation
> -
>
> Key: FLINK-10532
> URL: https://issues.apache.org/jira/browse/FLINK-10532
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.7.0
>
>
> https://travis-ci.org/apache/flink/builds/440115490#L599
> {code:java}
> http://localhost:4000/dev/stream/operators.html:
> Remote file does not exist -- broken link!!!
> --
> http://localhost:4000/dev/table/streaming/sql.html:
> Remote file does not exist -- broken link!!!
> http://localhost:4000/dev/table/streaming.html:
> Remote file does not exist -- broken link!!!{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648269#comment-16648269
 ] 

ASF GitHub Bot commented on FLINK-10423:


sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224876736
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull 

[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224876736
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+ 

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648194#comment-16648194
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224864142
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   

[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224864142
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648180#comment-16648180
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224860246
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   

[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224860246
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648178#comment-16648178
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224860246
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   

[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224860246
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648174#comment-16648174
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224860246
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648176#comment-16648176
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224860246
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   

[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224860246
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+

[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224860246
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+

[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224856170
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -606,6 +627,17 @@ private RocksDB openDB(
Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
"Not all requested column family handles have been 
created");
 
+   if (this.metricOptions.isEnabled()) {
+   this.nativeMetricMonitor = new 
RocksDBNativeMetricMonitor(
+   dbRef,
+   rocksDBResourceGuard,
+   metricOptions,
+   operatorMetricGroup
+   );
+
+   
this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor);
 
 Review comment:
   It is not an issue in the sense of a bug, but it is much more explicit. 
`CloseableRegistry` is typically used to coordinate closing of resources across 
multiple threads and I would not use it if we move operate in just one thread.
   
   I think you need to close metrics as first thing in dispose() or else it 
will block. As a rule of thump, all closing in `dispose()`should happen in the 
reverse order of creation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648152#comment-16648152
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224856170
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -606,6 +627,17 @@ private RocksDB openDB(
Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
"Not all requested column family handles have been 
created");
 
+   if (this.metricOptions.isEnabled()) {
+   this.nativeMetricMonitor = new 
RocksDBNativeMetricMonitor(
+   dbRef,
+   rocksDBResourceGuard,
+   metricOptions,
+   operatorMetricGroup
+   );
+
+   
this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor);
 
 Review comment:
   It is not an issue in the sense of a bug, but it is much more explicit. 
`CloseableRegistry` is typically used to coordinate closing of resources across 
multiple threads and I would not use it if we move operate in just one thread.
   
   I think you need to close metrics as first thing in dispose() or else it 
will block. As a rule of thump, all closing in `dispose()`should happen in the 
reverse order of creation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648147#comment-16648147
 ] 

ASF GitHub Bot commented on FLINK-10423:


sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224855111
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull 

[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224855111
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+ 

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648146#comment-16648146
 ] 

ASF GitHub Bot commented on FLINK-10423:


sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224854199
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -606,6 +627,17 @@ private RocksDB openDB(
Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
"Not all requested column family handles have been 
created");
 
+   if (this.metricOptions.isEnabled()) {
+   this.nativeMetricMonitor = new 
RocksDBNativeMetricMonitor(
+   dbRef,
+   rocksDBResourceGuard,
+   metricOptions,
+   operatorMetricGroup
+   );
+
+   
this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor);
 
 Review comment:
   Why is this an issue? The cancelStreamRegistry is closed in 
`AbstractKeyedStateBackend#dispose`.  I manually called close inside of 
`dispose` initially but I was concerned about the strict ordering it imposed. 
If the nativeMetricsWatcher wasn't closed first `rocksDBResourceGuard.close()` 
would block forever. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648142#comment-16648142
 ] 

ASF GitHub Bot commented on FLINK-10423:


sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224855415
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
 
 Review comment:
   Agreed, I am going to push the lease into the gauge itself. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224855415
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
 
 Review comment:
   Agreed, I am going to push the lease into the gauge itself. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224854199
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -606,6 +627,17 @@ private RocksDB openDB(
Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
"Not all requested column family handles have been 
created");
 
+   if (this.metricOptions.isEnabled()) {
+   this.nativeMetricMonitor = new 
RocksDBNativeMetricMonitor(
+   dbRef,
+   rocksDBResourceGuard,
+   metricOptions,
+   operatorMetricGroup
+   );
+
+   
this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor);
 
 Review comment:
   Why is this an issue? The cancelStreamRegistry is closed in 
`AbstractKeyedStateBackend#dispose`.  I manually called close inside of 
`dispose` initially but I was concerned about the strict ordering it imposed. 
If the nativeMetricsWatcher wasn't closed first `rocksDBResourceGuard.close()` 
would block forever. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648141#comment-16648141
 ] 

ASF GitHub Bot commented on FLINK-10423:


sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224855111
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull 

[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224855111
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+ 

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648136#comment-16648136
 ] 

ASF GitHub Bot commented on FLINK-10423:


sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224854199
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -606,6 +627,17 @@ private RocksDB openDB(
Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
"Not all requested column family handles have been 
created");
 
+   if (this.metricOptions.isEnabled()) {
+   this.nativeMetricMonitor = new 
RocksDBNativeMetricMonitor(
+   dbRef,
+   rocksDBResourceGuard,
+   metricOptions,
+   operatorMetricGroup
+   );
+
+   
this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor);
 
 Review comment:
   Why is this an issue? The cancelStreamRegistry is closed in 
`AbstractKeyedStateBackend#dispose`.  I manually called close inside of 
`dispose` first but I was concerned about the strict ordering it imposed 
because if the nativeMetricsWatcher wasn't closed first 
`rocksDBResourceGuard.close()` would block forever. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> -
>
> Key: FLINK-10423
> URL: https://issues.apache.org/jira/browse/FLINK-10423
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics, State Backends, Checkpointing
>Reporter: Seth Wiesman
>Assignee: Seth Wiesman
>Priority: Major
>  Labels: pull-request-available
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
sjwiesman commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224854199
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -606,6 +627,17 @@ private RocksDB openDB(
Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
"Not all requested column family handles have been 
created");
 
+   if (this.metricOptions.isEnabled()) {
+   this.nativeMetricMonitor = new 
RocksDBNativeMetricMonitor(
+   dbRef,
+   rocksDBResourceGuard,
+   metricOptions,
+   operatorMetricGroup
+   );
+
+   
this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor);
 
 Review comment:
   Why is this an issue? The cancelStreamRegistry is closed in 
`AbstractKeyedStateBackend#dispose`.  I manually called close inside of 
`dispose` first but I was concerned about the strict ordering it imposed 
because if the nativeMetricsWatcher wasn't closed first 
`rocksDBResourceGuard.close()` would block forever. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648131#comment-16648131
 ] 

ASF GitHub Bot commented on FLINK-10516:


yanyan300300 closed pull request #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 462682f7e5f..f3dd27b5ef0 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -27,6 +27,7 @@
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -67,6 +68,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -160,6 +162,13 @@ protected int run(String[] args) {
 
final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
 
+   // configure the filesystems
+   try {
+   FileSystem.initialize(flinkConfig);
+   } catch (IOException e) {
+   throw new IOException("Error while configuring 
the filesystems.", e);
+   }
+
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
if (remoteKeytabPrincipal != null && f.exists()) {
String keytabPath = f.getAbsolutePath();
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
index b15374b2a4b..929dbdbce3b 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
@@ -19,9 +19,12 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.OperatingSystem;
 
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Assume;
@@ -29,9 +32,14 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +56,7 @@
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
@@ -56,6 +65,8 @@
 /**
  * Tests for the {@link YarnApplicationMasterRunner}.
  */
+@PrepareForTest(FileSystem.class)
+@RunWith(PowerMockRunner.class)
 public class YarnApplicationMasterRunnerTest {
private static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class);
 
@@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
taskManagerConf, workingDirectory, 
taskManagerMainClass, LOG);
assertEquals("file", 
ctx.getLocalResources().get("flink.jar").getResource().getScheme());
}
+
+   @Test
+   public void testRunAndInitializeFileSystem() throws 

[GitHub] yanyan300300 closed pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup

2018-10-12 Thread GitBox
yanyan300300 closed pull request #6828: [FLINK-10516] [yarn] fix 
YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink 
Configuration during setup
URL: https://github.com/apache/flink/pull/6828
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 462682f7e5f..f3dd27b5ef0 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -27,6 +27,7 @@
 import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -67,6 +68,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -160,6 +162,13 @@ protected int run(String[] args) {
 
final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
 
+   // configure the filesystems
+   try {
+   FileSystem.initialize(flinkConfig);
+   } catch (IOException e) {
+   throw new IOException("Error while configuring 
the filesystems.", e);
+   }
+
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
if (remoteKeytabPrincipal != null && f.exists()) {
String keytabPath = f.getAbsolutePath();
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
index b15374b2a4b..929dbdbce3b 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
@@ -19,9 +19,12 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.OperatingSystem;
 
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Assume;
@@ -29,9 +32,14 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +56,7 @@
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
@@ -56,6 +65,8 @@
 /**
  * Tests for the {@link YarnApplicationMasterRunner}.
  */
+@PrepareForTest(FileSystem.class)
+@RunWith(PowerMockRunner.class)
 public class YarnApplicationMasterRunnerTest {
private static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class);
 
@@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
taskManagerConf, workingDirectory, 
taskManagerMainClass, LOG);
assertEquals("file", 
ctx.getLocalResources().get("flink.jar").getResource().getScheme());
}
+
+   @Test
+   public void testRunAndInitializeFileSystem() throws Exception {
+   // Mock necessary system variables
+   Map map = new HashMap(System.getenv());
+   map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo");
+   // Create dynamic properties to be used 

[GitHub] sunjincheng121 commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-12 Thread GitBox
sunjincheng121 commented on issue #6805: [FLINK-10156][table] Deprecate 
Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#issuecomment-429392078
 
 
   Thanks for the update @fhueske !
   The current change LGTM.
   +1 to merged. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648120#comment-16648120
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on issue #6805: [FLINK-10156][table] Deprecate 
Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#issuecomment-429392078
 
 
   Thanks for the update @fhueske !
   The current change LGTM.
   +1 to merged. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-12 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224851062
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -944,7 +956,12 @@ class Table(
 * @param tableName Name of the registered [[TableSink]] to which the 
[[Table]] is written.
 */
   def insertInto(tableName: String): Unit = {
-tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+this.logicalPlan match {
+  case _: LogicalTableFunctionCall =>
+throw new ValidationException("TableFunction can only be used in join 
and leftOuterJoin.")
+  case _ =>
+tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
 
 Review comment:
   if we check NULL, we should throw exception, but I think your right. I agree 
keep the current approach .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648116#comment-16648116
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224851062
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -944,7 +956,12 @@ class Table(
 * @param tableName Name of the registered [[TableSink]] to which the 
[[Table]] is written.
 */
   def insertInto(tableName: String): Unit = {
-tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+this.logicalPlan match {
+  case _: LogicalTableFunctionCall =>
+throw new ValidationException("TableFunction can only be used in join 
and leftOuterJoin.")
+  case _ =>
+tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
 
 Review comment:
   if we check NULL, we should throw exception, but I think your right. I agree 
keep the current approach .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10398) Add Tanh math function supported in Table API and SQL

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648092#comment-16648092
 ] 

ASF GitHub Bot commented on FLINK-10398:


yanghua commented on a change in pull request #6736: [FLINK-10398][table] Add 
Tanh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6736#discussion_r224845010
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1219,6 +1219,18 @@ TAN(numeric)
   
 
 
+
+  
+{% highlight text %}
+TANH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic tangent of NUMERIC. 
+Return value type is DOUBLE.
 
 Review comment:
   In fact, I just followed the [same 
pattern](https://github.com/apache/flink/pull/6700/files#diff-51c7d3bf736c60e8b942791a3715cbffR1284)
 as the PR of cosh, which looks good to Piotr, and according to Piotr's advice: 
[the type is best written 
uniformly](https://github.com/apache/flink/pull/6700#discussion_r220892667), 
and you haven't objected to this point before. But here you suggest I change it 
to lowercase. Your guidelines seem vague.
   
   However, there is also my problem. I should have made these functions in the 
same PR to make the change more effective. There are a few points that I 
probably didn't pay much attention to, but I think many of the points you 
mentioned earlier are about documentation. Document description, however, has 
no definite criteria, and may be what you think is standard, but another person 
disagrees. So it caused a lot of reciprocating.
   
   Plus, if you're willing to review, I think it's more contributor-friendly 
for you to give a definite review conclusion once (except for new problems 
arising from secondary changes). I feel that each time you give a partial 
review suggestion (some of which are not related to my last change), it also 
creates more reciprocity.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Tanh math function supported in Table API and SQL
> -
>
> Key: FLINK-10398
> URL: https://issues.apache.org/jira/browse/FLINK-10398
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> refer to : https://www.techonthenet.com/oracle/functions/tanh.php



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL

2018-10-12 Thread GitBox
yanghua commented on a change in pull request #6736: [FLINK-10398][table] Add 
Tanh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6736#discussion_r224845010
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1219,6 +1219,18 @@ TAN(numeric)
   
 
 
+
+  
+{% highlight text %}
+TANH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic tangent of NUMERIC. 
+Return value type is DOUBLE.
 
 Review comment:
   In fact, I just followed the [same 
pattern](https://github.com/apache/flink/pull/6700/files#diff-51c7d3bf736c60e8b942791a3715cbffR1284)
 as the PR of cosh, which looks good to Piotr, and according to Piotr's advice: 
[the type is best written 
uniformly](https://github.com/apache/flink/pull/6700#discussion_r220892667), 
and you haven't objected to this point before. But here you suggest I change it 
to lowercase. Your guidelines seem vague.
   
   However, there is also my problem. I should have made these functions in the 
same PR to make the change more effective. There are a few points that I 
probably didn't pay much attention to, but I think many of the points you 
mentioned earlier are about documentation. Document description, however, has 
no definite criteria, and may be what you think is standard, but another person 
disagrees. So it caused a lot of reciprocating.
   
   Plus, if you're willing to review, I think it's more contributor-friendly 
for you to give a definite review conclusion once (except for new problems 
arising from secondary changes). I feel that each time you give a partial 
review suggestion (some of which are not related to my last change), it also 
creates more reciprocity.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10508) Port JobManagerITCase to new code base

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648089#comment-16648089
 ] 

ASF GitHub Bot commented on FLINK-10508:


TisonKun opened a new pull request #6834: [WIP] [FLINK-10508] [tests] Port 
JobManagerITCase to new code base
URL: https://github.com/apache/flink/pull/6834
 
 
   
   
   ## What is the purpose of the change
   
   Port `JobManagerITCase` to new code base
   
   ## Brief change log
   
   Technically legacy `JobManagerITCase` tests a series of job 
executions(different job, success or fail), by starting a `TestingCluster`. 
Thus I think the most convenient way to port this test is using `MiniCluster` 
and run ITCase on it. Follow this thought I port the test to `MiniClusterITCase`
   
   
   ## Verifying this change
   
   Changes themselves are tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**no**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Port JobManagerITCase to new code base
> --
>
> Key: FLINK-10508
> URL: https://issues.apache.org/jira/browse/FLINK-10508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{JobManagerITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10508) Port JobManagerITCase to new code base

2018-10-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10508:
---
Labels: pull-request-available  (was: )

> Port JobManagerITCase to new code base
> --
>
> Key: FLINK-10508
> URL: https://issues.apache.org/jira/browse/FLINK-10508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{JobManagerITCase}} to new code base.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun opened a new pull request #6834: [WIP] [FLINK-10508] [tests] Port JobManagerITCase to new code base

2018-10-12 Thread GitBox
TisonKun opened a new pull request #6834: [WIP] [FLINK-10508] [tests] Port 
JobManagerITCase to new code base
URL: https://github.com/apache/flink/pull/6834
 
 
   
   
   ## What is the purpose of the change
   
   Port `JobManagerITCase` to new code base
   
   ## Brief change log
   
   Technically legacy `JobManagerITCase` tests a series of job 
executions(different job, success or fail), by starting a `TestingCluster`. 
Thus I think the most convenient way to port this test is using `MiniCluster` 
and run ITCase on it. Follow this thought I port the test to `MiniClusterITCase`
   
   
   ## Verifying this change
   
   Changes themselves are tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (**no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**no**)
 - The serializers: (**no**)
 - The runtime per-record code paths (performance sensitive): (**no**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**)
 - The S3 file system connector: (**no**)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**no**)
 - If yes, how is the feature documented? (**no**)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9697) Provide connector for Kafka 2.0.0

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648078#comment-16648078
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0
URL: https://github.com/apache/flink/pull/6703#issuecomment-429376594
 
 
   @aljoscha OK, I will refactor the commits and commit messages.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for Kafka 2.0.0
> -
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0

2018-10-12 Thread GitBox
yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0
URL: https://github.com/apache/flink/pull/6703#issuecomment-429376594
 
 
   @aljoscha OK, I will refactor the commits and commit messages.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9697) Provide connector for Kafka 2.0.0

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648072#comment-16648072
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 
2.0.0
URL: https://github.com/apache/flink/pull/6703#issuecomment-429374391
 
 
   @yanghua Thanks for iterating on this so quickly! I will do a last thorough 
review next week and then hopefully merge.
   
   To make the reviewing easier you could restructure the commits a bit. All 
prerequisite work should go into isolated commits and then the last commit 
should only have the new files for the Kafka 2.0 connector. For example the 
dependency change would be separate, or renaming the `KafkaTableSource` to 
`KafkaTableSourceBase` can be separate to more clearly see what each change 
does. This makes it way easier in the feature when someone looks at the changes 
and tries to figure out what was going on.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for Kafka 2.0.0
> -
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648069#comment-16648069
 ] 

ASF GitHub Bot commented on FLINK-10384:


xccui commented on a change in pull request #6730: [FLINK-10384][table] Add 
Sinh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#discussion_r224835409
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1197,6 +1197,18 @@ SIN(numeric)
   
 
 
+
+  
+{% highlight text %}
+SINH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic sine of NUMERIC. 
+Return value type is DOUBLE.
 
 Review comment:
   Change `NUMERIC` to lowercase.
   
   The return type is ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Sinh math function supported in Table API and SQL
> -
>
> Key: FLINK-10384
> URL: https://issues.apache.org/jira/browse/FLINK-10384
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> like FLINK-10340 for adding Cosh math function



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10398) Add Tanh math function supported in Table API and SQL

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648070#comment-16648070
 ] 

ASF GitHub Bot commented on FLINK-10398:


xccui commented on a change in pull request #6736: [FLINK-10398][table] Add 
Tanh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6736#discussion_r224835500
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1219,6 +1219,18 @@ TAN(numeric)
   
 
 
+
+  
+{% highlight text %}
+TANH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic tangent of NUMERIC. 
+Return value type is DOUBLE.
 
 Review comment:
   Change the `NUMERIC` to lowercase here.
   
   The return type is ...
   
   Please ALWAYS be careful with these minor issues cause they have been 
pointed out many times.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Tanh math function supported in Table API and SQL
> -
>
> Key: FLINK-10398
> URL: https://issues.apache.org/jira/browse/FLINK-10398
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> refer to : https://www.techonthenet.com/oracle/functions/tanh.php



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] xccui commented on a change in pull request #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL

2018-10-12 Thread GitBox
xccui commented on a change in pull request #6736: [FLINK-10398][table] Add 
Tanh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6736#discussion_r224835500
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1219,6 +1219,18 @@ TAN(numeric)
   
 
 
+
+  
+{% highlight text %}
+TANH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic tangent of NUMERIC. 
+Return value type is DOUBLE.
 
 Review comment:
   Change the `NUMERIC` to lowercase here.
   
   The return type is ...
   
   Please ALWAYS be careful with these minor issues cause they have been 
pointed out many times.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0

2018-10-12 Thread GitBox
aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 
2.0.0
URL: https://github.com/apache/flink/pull/6703#issuecomment-429374391
 
 
   @yanghua Thanks for iterating on this so quickly! I will do a last thorough 
review next week and then hopefully merge.
   
   To make the reviewing easier you could restructure the commits a bit. All 
prerequisite work should go into isolated commits and then the last commit 
should only have the new files for the Kafka 2.0 connector. For example the 
dependency change would be separate, or renaming the `KafkaTableSource` to 
`KafkaTableSourceBase` can be separate to more clearly see what each change 
does. This makes it way easier in the feature when someone looks at the changes 
and tries to figure out what was going on.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] xccui commented on a change in pull request #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL

2018-10-12 Thread GitBox
xccui commented on a change in pull request #6730: [FLINK-10384][table] Add 
Sinh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6730#discussion_r224835409
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1197,6 +1197,18 @@ SIN(numeric)
   
 
 
+
+  
+{% highlight text %}
+SINH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic sine of NUMERIC. 
+Return value type is DOUBLE.
 
 Review comment:
   Change `NUMERIC` to lowercase.
   
   The return type is ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648049#comment-16648049
 ] 

ASF GitHub Bot commented on FLINK-10156:


fhueske commented on a change in pull request #6805: [FLINK-10156][table] 
Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224831240
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -944,7 +956,12 @@ class Table(
 * @param tableName Name of the registered [[TableSink]] to which the 
[[Table]] is written.
 */
   def insertInto(tableName: String): Unit = {
-tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+this.logicalPlan match {
+  case _: LogicalTableFunctionCall =>
+throw new ValidationException("TableFunction can only be used in join 
and leftOuterJoin.")
+  case _ =>
+tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
 
 Review comment:
   The check is basically replacing the null check but provides more context. 
   What would we do if `tableEnv` is `null`? Do nothing? Throw an exception? 
What  would be the error message? By checking for the type of the root node of 
the logical plan we know why the call would not succeed and can give an 
appropriate error message. 
   
   We could of course also pass `null` as configuration and check for it to be 
null in `insertInto(String, QueryConfig)` but I'd consider that as an invalid 
use of the interface that should also result in an exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-12 Thread GitBox
fhueske commented on a change in pull request #6805: [FLINK-10156][table] 
Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224831240
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -944,7 +956,12 @@ class Table(
 * @param tableName Name of the registered [[TableSink]] to which the 
[[Table]] is written.
 */
   def insertInto(tableName: String): Unit = {
-tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+this.logicalPlan match {
+  case _: LogicalTableFunctionCall =>
+throw new ValidationException("TableFunction can only be used in join 
and leftOuterJoin.")
+  case _ =>
+tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
 
 Review comment:
   The check is basically replacing the null check but provides more context. 
   What would we do if `tableEnv` is `null`? Do nothing? Throw an exception? 
What  would be the error message? By checking for the type of the root node of 
the logical plan we know why the call would not succeed and can give an 
appropriate error message. 
   
   We could of course also pass `null` as configuration and check for it to be 
null in `insertInto(String, QueryConfig)` but I'd consider that as an invalid 
use of the interface that should also result in an exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648041#comment-16648041
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224828122
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 ##
 @@ -177,10 +195,16 @@ class TableSinkITCase extends AbstractTestBase {
   .assignAscendingTimestamps(_._1.toLong)
   .toTable(tEnv, 'id, 'num, 'text)
 
+tEnv.registerTableSink(
+  "retractSink",
+  new TestRetractSink().configure(
+Array[String]("len", "icnt", "nsum"),
+Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG)))
+
 t.select('id, 'num, 'text.charLength() as 'len)
   .groupBy('len)
-  .select('len, 'id.count, 'num.sum)
-  .writeToSink(new TestRetractSink)
+  .select('len, 'id.count as 'icnt, 'num.sum as 'nsum)
 
 Review comment:
   Sounds good to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648039#comment-16648039
 ] 

ASF GitHub Bot commented on FLINK-10156:


sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224827916
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -944,7 +956,12 @@ class Table(
 * @param tableName Name of the registered [[TableSink]] to which the 
[[Table]] is written.
 */
   def insertInto(tableName: String): Unit = {
-tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+this.logicalPlan match {
+  case _: LogicalTableFunctionCall =>
+throw new ValidationException("TableFunction can only be used in join 
and leftOuterJoin.")
+  case _ =>
+tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
 
 Review comment:
   I feel like that put same code in the two `overload` method is not the best 
way.  We should check the NULL of `tableEnv` Before we call 
`insertInto(tableName, this.tableEnv.queryConfig)`, It's that make sense to 
you?  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop the Table.writeToSink() method
> ---
>
> Key: FLINK-10156
> URL: https://issues.apache.org/jira/browse/FLINK-10156
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: pull-request-available
>
> I am proposing to drop the {{Table.writeToSink()}} method.
>  
> *What is the method doing?*
> The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a 
> {{TableSink}}, for example to a Kafka topic, a file, or a database.
>  
> *Why should it be removed?*
> The {{writeToSink()}} method was introduced before the Table API supported 
> the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a 
> table into a table that was previously registered with a {{TableSink}} in the 
> catalog. It is the inverse method to the {{scan()}} method and the equivalent 
> to an {{INSERT INTO ... SELECT}} SQL query.
>  
> I think we should remove {{writeToSink()}} for the following reasons:
> 1. It offers the same functionality as {{insertInto()}}. Removing it would 
> reduce duplicated API.
> 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks 
> (and TableSources) should only be registered with the {{TableEnvironment}} 
> and not be exposed to the "query part" of the Table API / SQL.
> 3. Registering tables in a catalog and using them for input and output is 
> more aligned with SQL.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9697) Provide connector for Kafka 2.0.0

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648042#comment-16648042
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0
URL: https://github.com/apache/flink/pull/6703#issuecomment-429367779
 
 
   @aljoscha compile and test successfully, what I should do next?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for Kafka 2.0.0
> -
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0

2018-10-12 Thread GitBox
yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0
URL: https://github.com/apache/flink/pull/6703#issuecomment-429367779
 
 
   @aljoscha compile and test successfully, what I should do next?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-12 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224828122
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
 ##
 @@ -177,10 +195,16 @@ class TableSinkITCase extends AbstractTestBase {
   .assignAscendingTimestamps(_._1.toLong)
   .toTable(tEnv, 'id, 'num, 'text)
 
+tEnv.registerTableSink(
+  "retractSink",
+  new TestRetractSink().configure(
+Array[String]("len", "icnt", "nsum"),
+Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG)))
+
 t.select('id, 'num, 'text.charLength() as 'len)
   .groupBy('len)
-  .select('len, 'id.count, 'num.sum)
-  .writeToSink(new TestRetractSink)
+  .select('len, 'id.count as 'icnt, 'num.sum as 'nsum)
 
 Review comment:
   Sounds good to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()

2018-10-12 Thread GitBox
sunjincheng121 commented on a change in pull request #6805: 
[FLINK-10156][table] Deprecate Table.writeToSink()
URL: https://github.com/apache/flink/pull/6805#discussion_r224827916
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ##
 @@ -944,7 +956,12 @@ class Table(
 * @param tableName Name of the registered [[TableSink]] to which the 
[[Table]] is written.
 */
   def insertInto(tableName: String): Unit = {
-tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
+this.logicalPlan match {
+  case _: LogicalTableFunctionCall =>
+throw new ValidationException("TableFunction can only be used in join 
and leftOuterJoin.")
+  case _ =>
+tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig)
 
 Review comment:
   I feel like that put same code in the two `overload` method is not the best 
way.  We should check the NULL of `tableEnv` Before we call 
`insertInto(tableName, this.tableEnv.queryConfig)`, It's that make sense to 
you?  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647998#comment-16647998
 ] 

ASF GitHub Bot commented on FLINK-10423:


StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224817432
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   

[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
StefanRRichter commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224817432
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+

[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647990#comment-16647990
 ] 

ASF GitHub Bot commented on FLINK-10423:


zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224761096
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull 

[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor

2018-10-12 Thread GitBox
zentol commented on a change in pull request #6814: 
[FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
URL: https://github.com/apache/flink/pull/6814#discussion_r224761096
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.ResourceGuard;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * A monitor which pull {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+
+   private final CloseableRegistry registeredGauges;
+
+   private final RocksDB db;
+
+   private final ResourceGuard.Lease lease;
+
+   private final RocksDBNativeMetricOptions options;
+
+   private final MetricGroup metricGroup;
+
+   RocksDBNativeMetricMonitor(
+   @Nonnull RocksDB db,
+   @Nonnull ResourceGuard guard,
+   @Nonnull RocksDBNativeMetricOptions options,
+   @Nonnull MetricGroup metricGroup
+   ) throws IOException {
+   this.db = db;
+   this.lease = guard.acquireResource();
+   this.options = options;
+   this.metricGroup = metricGroup;
+
+   this.registeredGauges = new CloseableRegistry();
+   }
+
+   /**
+* Register gauges to pull native metrics for the column family.
+* @param columnFamilyName group name for the new gauges
+* @param handle native handle to the column family
+*/
+   void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+   try {
+   MetricGroup group = 
metricGroup.addGroup(columnFamilyName);
+
+   for (String property : options.getProperties()) {
+   RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(
+   property,
+   handle,
+   db
+   );
+
+   group.gauge(property, gauge);
+   registeredGauges.registerCloseable(gauge);
+   }
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Unable to register 
native metrics with RocksDB", e);
+   }
+   }
+
+   @Override
+   public void close() {
+   IOUtils.closeQuietly(registeredGauges);
+   IOUtils.closeQuietly(lease);
+   }
+
+   static class RocksDBNativeMetricView implements Gauge, View, 
Closeable {
+   private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricView.class);
+
+   private final String property;
+
+   private final ColumnFamilyHandle handle;
+
+   private final RocksDB db;
+
+   private volatile boolean open;
+
+   private long value;
+
+   private RocksDBNativeMetricView(
+   @Nonnull String property,
+   @Nonnull ColumnFamilyHandle handle,
+   @Nonnull RocksDB db
+   ) {
+   this.property = property;
+   this.handle = handle;
+   this.db = db;
+

[GitHub] TisonKun closed pull request #6830: [hotfix] [jobmaster] refactor suspendExecutionGraph and clearExecutio…

2018-10-12 Thread GitBox
TisonKun closed pull request #6830: [hotfix] [jobmaster] refactor 
suspendExecutionGraph and clearExecutio…
URL: https://github.com/apache/flink/pull/6830
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 31c6a97124b..f8d8362274d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1193,21 +1193,20 @@ private void 
suspendAndClearExecutionGraphFields(Exception cause) {
 
private void suspendExecutionGraph(Exception cause) {
executionGraph.suspend(cause);
+   }
 
+   private void clearExecutionGraphFields() {
if (jobManagerJobMetricGroup != null) {
jobManagerJobMetricGroup.close();
+   jobManagerJobMetricGroup = null;
}
 
if (jobStatusListener != null) {
jobStatusListener.stop();
+   jobStatusListener = null;
}
}
 
-   private void clearExecutionGraphFields() {
-   jobManagerJobMetricGroup = null;
-   jobStatusListener = null;
-   }
-
/**
 * Dispose the savepoint stored under the given path.
 *


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #6830: [hotfix] [jobmaster] refactor suspendExecutionGraph and clearExecutio…

2018-10-12 Thread GitBox
TisonKun commented on issue #6830: [hotfix] [jobmaster] refactor 
suspendExecutionGraph and clearExecutio…
URL: https://github.com/apache/flink/pull/6830#issuecomment-429349183
 
 
   @tillrohrmann ok, could you take over it and add as a hotfix of that branch?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster

2018-10-12 Thread ouyangzhe (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647972#comment-16647972
 ] 

ouyangzhe commented on FLINK-10534:
---

Thanks [~till.rohrmann] ,

The flink session cluster will be created by two cases:

1. Flink sessions started by user through yarn-session.sh

2. User runs flink programs in interactive mode, firstly a flink session 
cluster will be started, the cluster may be shut down if user programs finished 
properly, but if user main program crashed or killed. the session cluster maybe 
residual.

so I propose to add an idle timeout configuration, let user have the ability to 
handle this problem.

 

> Add idle timeout for a flink session cluster
> 
>
> Key: FLINK-10534
> URL: https://issues.apache.org/jira/browse/FLINK-10534
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Affects Versions: 1.7.0
>Reporter: ouyangzhe
>Assignee: ouyangzhe
>Priority: Major
> Attachments: 屏幕快照 2018-10-12 上午10.24.08.png
>
>
> The flink session cluster on yarn will aways be running while has no jobs 
> running at all, it will occupy the yarn resources for no use.
> Taskmanagers will be released after an idle timeout, but jobmanager will keep 
> running.
> I propose to add a configuration to limit the idle timeout for jobmanager 
> too, if no job running after a specified timeout, the flink cluster auto 
> finish itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] fhueske commented on a change in pull request #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with VALUES

2018-10-12 Thread GitBox
fhueske commented on a change in pull request #6792: [FLINK-10474][table] Don't 
translate IN/NOT_IN to JOIN with VALUES
URL: https://github.com/apache/flink/pull/6792#discussion_r224801724
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/ConvertToNotInOrInRule.scala
 ##
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
+import org.apache.calcite.rel.core.Filter
+import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlBinaryOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.{IN, NOT_IN, EQUALS, 
NOT_EQUALS, AND, OR}
+import org.apache.calcite.tools.RelBuilder
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+/**
+  * Rule for converting a cascade of predicates to [[IN]] or [[NOT_IN]].
+  *
+  * For example, convert predicate: (x = 1 OR x = 2 OR x = 3) AND y = 4 to
+  * predicate: x IN (1, 2, 3) AND y = 4.
+  *
+  * @param fromOperator The fromOperator, for example, when convert to 
[[IN]], fromOperator is
+  * [[EQUALS]]. We convert a cascade of [[EQUALS]] to 
[[IN]].
+  * @param connectOperator  The connect operator to connect the fromOperator.
+  * @param composedOperator The composed operator that may contains sub [[IN]] 
or [[NOT_IN]].
+  * @param toOperator   The toOperator, for example, when convert to 
[[IN]], toOperator is
+  * [[IN]]. We convert a cascade of [[EQUALS]] to 
[[IN]].
+  * @param description  The description of the rule.
+  */
+class ConvertToNotInOrInRule(
+fromOperator: SqlBinaryOperator,
 
 Review comment:
   Only pass the `toOperator` parameter and set all other parameters 
appropriately?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10474) Don't translate IN with Literals to JOIN with VALUES for streaming queries

2018-10-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647957#comment-16647957
 ] 

ASF GitHub Bot commented on FLINK-10474:


fhueske commented on a change in pull request #6792: [FLINK-10474][table] Don't 
translate IN/NOT_IN to JOIN with VALUES
URL: https://github.com/apache/flink/pull/6792#discussion_r224778582
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
 ##
 @@ -351,4 +351,43 @@ class CalcITCase extends AbstractTestBase {
   "{9=Comment#3}")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testIn(): Unit = {
 
 Review comment:
   Same as for the streaming SQL test. Merge with an existing method to keep 
the overhead low.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Don't translate IN with Literals to JOIN with VALUES for streaming queries
> --
>
> Key: FLINK-10474
> URL: https://issues.apache.org/jira/browse/FLINK-10474
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> IN predicates with literals are translated to JOIN with VALUES if the number 
> of elements in the IN clause exceeds a certain threshold. This should not be 
> done, because a streaming join is very heavy and materializes both inputs 
> (which is fine for the VALUES) input but not for the other.
> There are two ways to solve this:
>  # don't translate IN to a JOIN at all
>  # translate it to a JOIN but have a special join strategy if one input is 
> bound and final (non-updating)
> Option 1. should be easy to do, option 2. requires much more effort.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >