[GitHub] yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-429502056 @tillrohrmann rebased and all checks have passed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648683#comment-16648683 ] ASF GitHub Bot commented on FLINK-10135: yanghua commented on issue #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#issuecomment-429502056 @tillrohrmann rebased and all checks have passed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648665#comment-16648665 ] Shuyi Chen commented on FLINK-10516: [~till.rohrmann], [~aljoscha], do you want me to cherry pick the change onto branch release-1.5 and release-1.6? Thanks. > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648664#comment-16648664 ] Shuyi Chen commented on FLINK-10516: This is fixed in 1.7.0 with 5e90ed95a580aefd84b72f593954d01f4eb67f68. > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648660#comment-16648660 ] ASF GitHub Bot commented on FLINK-10516: asfgit closed pull request #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali… URL: https://github.com/apache/flink/pull/6836 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index e98e174cbdb..c275bfe0a02 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -67,6 +68,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -163,6 +165,13 @@ protected int run(String[] args) { final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties, LOG); + // configure the filesystems + try { + FileSystem.initialize(flinkConfig); + } catch (IOException e) { + throw new IOException("Error while configuring the filesystems.", e); + } + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); if (remoteKeytabPrincipal != null && f.exists()) { String keytabPath = f.getAbsolutePath(); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali…
asfgit closed pull request #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali… URL: https://github.com/apache/flink/pull/6836 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index e98e174cbdb..c275bfe0a02 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -67,6 +68,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -163,6 +165,13 @@ protected int run(String[] args) { final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties, LOG); + // configure the filesystems + try { + FileSystem.initialize(flinkConfig); + } catch (IOException e) { + throw new IOException("Error while configuring the filesystems.", e); + } + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); if (remoteKeytabPrincipal != null && f.exists()) { String keytabPath = f.getAbsolutePath(); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648657#comment-16648657 ] ASF GitHub Bot commented on FLINK-10516: yanyan300300 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828#issuecomment-429497882 Thanks all for reviewing. I closed this PR and opened https://github.com/apache/flink/pull/6836 to be against master instead Flink 1.4, and removed the unit test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanyan300300 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
yanyan300300 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828#issuecomment-429497882 Thanks all for reviewing. I closed this PR and opened https://github.com/apache/flink/pull/6836 to be against master instead Flink 1.4, and removed the unit test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10542) Register Hive metastore as an external catalog in TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-10542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648654#comment-16648654 ] sunjincheng commented on FLINK-10542: - +1, This is a very important work for flink ecological construction. [~xuefuz] > Register Hive metastore as an external catalog in TableEnvironment > -- > > Key: FLINK-10542 > URL: https://issues.apache.org/jira/browse/FLINK-10542 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > > Similar to FLINK-2167 but rather register Hive metastore as an external > ctalog in the {{TableEnvironment}}. After registration, Table API and SQL > queries should be able to access all Hive tables. > This might supersede the need of FLINK-2167 because Hive metastore stores a > superset of tables available via hCat without an indirection. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10542) Register Hive metastore as an external catalog in TableEnvironment
Xuefu Zhang created FLINK-10542: --- Summary: Register Hive metastore as an external catalog in TableEnvironment Key: FLINK-10542 URL: https://issues.apache.org/jira/browse/FLINK-10542 Project: Flink Issue Type: New Feature Components: Table API SQL Affects Versions: 1.6.0 Reporter: Xuefu Zhang Assignee: Xuefu Zhang Similar to FLINK-2167 but rather register Hive metastore as an external ctalog in the {{TableEnvironment}}. After registration, Table API and SQL queries should be able to access all Hive tables. This might supersede the need of FLINK-2167 because Hive metastore stores a superset of tables available via hCat without an indirection. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] suez1224 commented on issue #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali…
suez1224 commented on issue #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali… URL: https://github.com/apache/flink/pull/6836#issuecomment-429481759 LGTM. Will merge after the tests run through. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648544#comment-16648544 ] ASF GitHub Bot commented on FLINK-10516: suez1224 commented on issue #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali… URL: https://github.com/apache/flink/pull/6836#issuecomment-429481759 LGTM. Will merge after the tests run through. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6036) Let catalog support partition
[ https://issues.apache.org/jira/browse/FLINK-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648525#comment-16648525 ] Xuefu Zhang commented on FLINK-6036: Hi [~jinyu.zj], are you interested in moving this forward? This seems to be a prerequisite of the Flink-Hive integration that were in discussion in the community. Thanks. > Let catalog support partition > - > > Key: FLINK-6036 > URL: https://issues.apache.org/jira/browse/FLINK-6036 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: jingzhang >Assignee: jingzhang >Priority: Major > > Now catalog only support CRUD at database and table level. But in some kind > of catalog, for example for hive, we also need do CRUD operations at > partition level. > This issue aims to let catalog support partition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanyan300300 opened a new pull request #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali…
yanyan300300 opened a new pull request #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali… URL: https://github.com/apache/flink/pull/6836 ## What is the purpose of the change This pull request makes YarnApplicationMasterRunner explicitly initialize FileSystem with Flink Configuration, so that the HadoopFileSystem can take in the custom HDFS configuration (e.g. core-site.xml under fs.hdfs.hadoopconf). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648496#comment-16648496 ] ASF GitHub Bot commented on FLINK-10516: yanyan300300 opened a new pull request #6836: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initiali… URL: https://github.com/apache/flink/pull/6836 ## What is the purpose of the change This pull request makes YarnApplicationMasterRunner explicitly initialize FileSystem with Flink Configuration, so that the HadoopFileSystem can take in the custom HDFS configuration (e.g. core-site.xml under fs.hdfs.hadoopconf). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: yes - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648492#comment-16648492 ] ASF GitHub Bot commented on FLINK-10516: suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828#issuecomment-429471339 @yanyan300300 , I am fine with adding just the fix w/o the unittest given the code will be deprecated in 1.7 in [FLINK-10392]([FLINK-10392|https://issues.apache.org/jira/browse/FLINK-10392]). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink > Configuration during setup > --- > > Key: FLINK-10516 > URL: https://issues.apache.org/jira/browse/FLINK-10516 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0, 1.5.0, 1.6.0, 1.7.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Will add a fix, and refactor YarnApplicationMasterRunner to add a unittest to > prevent future regression. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
suez1224 commented on issue #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828#issuecomment-429471339 @yanyan300300 , I am fine with adding just the fix w/o the unittest given the code will be deprecated in 1.7 in [FLINK-10392]([FLINK-10392|https://issues.apache.org/jira/browse/FLINK-10392]). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-10156. - Resolution: Done Fix Version/s: 1.7.0 Done with d1a03dd239555298da9ac9be4ea94ccf52d9887b > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10528) Remove deprecated APIs from Table API for Flink 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-10528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648479#comment-16648479 ] ASF GitHub Bot commented on FLINK-10528: asfgit closed pull request #6826: [FLINK-10528] [table] Remove methods that were deprecated in Flink 1.4.0 URL: https://github.com/apache/flink/pull/6826 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index a3b7093dd25..3767e5a1a99 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -457,7 +457,7 @@ public void testCassandraTableSink() throws Exception { tEnv.registerDataStreamInternal("testFlinkTable", source); - tEnv.sql("select * from testFlinkTable").writeToSink( + tEnv.sqlQuery("select * from testFlinkTable").writeToSink( new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY))); env.execute(); diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java index 27b75d49ac9..4bb0f31978c 100644 --- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java @@ -51,7 +51,7 @@ * hSrc.addColumn("fam2", "col1", String.class); * * tableEnv.registerTableSource("hTable", hSrc); - * Table res = tableEnv.sql("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1"); + * Table res = tableEnv.sqlQuery("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1"); * } * * diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java index 860999c80db..6e3ada4c493 100644 --- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java +++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java @@ -75,7 +75,7 @@ * .build(); * * tEnv.registerTableSource("orcTable", orcSrc); - * Table res = tableEnv.sql("SELECT * FROM orcTable"); + * Table res = tableEnv.sqlQuery("SELECT * FROM orcTable"); * } * */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index d740c3f1f99..6fe0a9e3a64 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -656,30 +656,6 @@ abstract class TableEnvironment(val config: TableConfig) { */ def explain(table: Table): String - /** -* Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. -* -* All tables referenced by the query must be registered in the TableEnvironment. -* A [[Table]] is automatically registered when its [[toString]] method is called, for example -* when it is embedded into a String. -* Hence, SQL queries can directly reference a [[Table]] as follows: -* -* {{{ -* val table: Table = ... -* // the table is not registered to the table environment -* tEnv.sql(s"SELECT * FROM $table") -* }}} -* -* @deprecated Use sqlQuery() instead. -* @param query The SQL query to evaluate. -* @return The result of the query as Table. -*/ - @Deprecated - @deprecated("Please use sqlQuery() instead.") - def sql(query: String): Table = { -sqlQuery(query) - } - /** * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. * diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala index 2da08faa12e..53fb7a08d47 100644 ---
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648478#comment-16648478 ] ASF GitHub Bot commented on FLINK-10156: asfgit closed pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md index 146d1a6c1fd..622fe9ffedf 100644 --- a/docs/dev/table/common.md +++ b/docs/dev/table/common.md @@ -46,6 +46,8 @@ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); tableEnv.registerTable("table1", ...)// or tableEnv.registerTableSource("table2", ...); // or tableEnv.registerExternalCatalog("extCat", ...); +// register an output Table +tableEnv.registerTableSink("outputTable", ...); // create a Table from a Table API query Table tapiResult = tableEnv.scan("table1").select(...); @@ -53,7 +55,7 @@ Table tapiResult = tableEnv.scan("table1").select(...); Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... "); // emit a Table API result Table to a TableSink, same for SQL result -tapiResult.writeToSink(...); +tapiResult.insertInto("outputTable"); // execute env.execute(); @@ -72,7 +74,9 @@ val tableEnv = TableEnvironment.getTableEnvironment(env) // register a Table tableEnv.registerTable("table1", ...) // or tableEnv.registerTableSource("table2", ...) // or -tableEnv.registerExternalCatalog("extCat", ...) +tableEnv.registerExternalCatalog("extCat", ...) +// register an output Table +tableEnv.registerTableSink("outputTable", ...); // create a Table from a Table API query val tapiResult = tableEnv.scan("table1").select(...) @@ -80,7 +84,7 @@ val tapiResult = tableEnv.scan("table1").select(...) val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...") // emit a Table API result Table to a TableSink, same for SQL result -tapiResult.writeToSink(...) +tapiResult.insertInto("outputTable") // execute env.execute() @@ -500,10 +504,7 @@ A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Ta Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`. -There are two ways to emit a table: - -1. The `Table.writeToSink(TableSink sink)` method emits the table using the provided `TableSink` and automatically configures the sink with the schema of the table to emit. -2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that was registered with a specific schema under the provided name in the `TableEnvironment`'s catalog. The schema of the table to emit is validated against the schema of the registered `TableSink`. +The `Table.insertInto(String tableName)` method emits the `Table` to a registered `TableSink`. The method looks up the `TableSink` from the catalog by the name and validates that the schema of the `Table` is identical to the schema of the `TableSink`. The following examples shows how to emit a `Table`: @@ -513,22 +514,17 @@ The following examples shows how to emit a `Table`: // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); -// compute a result Table using Table API operators and/or SQL queries -Table result = ... - // create a TableSink TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); -// METHOD 1: -// Emit the result Table to the TableSink via the writeToSink() method -result.writeToSink(sink); - -// METHOD 2: -// Register the TableSink with a specific schema +// register the TableSink with a specific schema String[] fieldNames = {"a", "b", "c"}; TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink); -// Emit the result Table to the registered TableSink via the insertInto() method + +// compute a result Table using Table API operators and/or SQL queries +Table result = ... +// emit the result Table to the registered TableSink result.insertInto("CsvSinkTable"); // execute the program @@ -540,22 +536,18 @@ result.insertInto("CsvSinkTable"); // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) -// compute a result Table using Table API operators and/or SQL queries -val result: Table = ... - // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") -// METHOD 1: -// Emit the result Table to
[jira] [Closed] (FLINK-10528) Remove deprecated APIs from Table API for Flink 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-10528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-10528. - Resolution: Done Fix Version/s: 1.7.0 Done with 5e90ed95a580aefd84b72f593954d01f4eb67f68 > Remove deprecated APIs from Table API for Flink 1.7.0 > - > > Key: FLINK-10528 > URL: https://issues.apache.org/jira/browse/FLINK-10528 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.7.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > There are a few APIs that have been deprecated for a while (since Flink > 1.4.0) and that could be removed: > * {{TableEnvironment.sql()}} as of FLINK-6442 > * {{StreamTableElnvironment.toDataStream()}} and > {{TableConversions.toDataStream()}} as of FLINK-6543 > * {{Table.limit()}} as of FLINK-7821 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6826: [FLINK-10528] [table] Remove methods that were deprecated in Flink 1.4.0
asfgit closed pull request #6826: [FLINK-10528] [table] Remove methods that were deprecated in Flink 1.4.0 URL: https://github.com/apache/flink/pull/6826 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index a3b7093dd25..3767e5a1a99 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -457,7 +457,7 @@ public void testCassandraTableSink() throws Exception { tEnv.registerDataStreamInternal("testFlinkTable", source); - tEnv.sql("select * from testFlinkTable").writeToSink( + tEnv.sqlQuery("select * from testFlinkTable").writeToSink( new CassandraAppendTableSink(builder, injectTableName(INSERT_DATA_QUERY))); env.execute(); diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java index 27b75d49ac9..4bb0f31978c 100644 --- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java @@ -51,7 +51,7 @@ * hSrc.addColumn("fam2", "col1", String.class); * * tableEnv.registerTableSource("hTable", hSrc); - * Table res = tableEnv.sql("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1"); + * Table res = tableEnv.sqlQuery("SELECT t.fam2.col1, SUM(t.fam1.col2) FROM hTable AS t GROUP BY t.fam2.col1"); * } * * diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java index 860999c80db..6e3ada4c493 100644 --- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java +++ b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java @@ -75,7 +75,7 @@ * .build(); * * tEnv.registerTableSource("orcTable", orcSrc); - * Table res = tableEnv.sql("SELECT * FROM orcTable"); + * Table res = tableEnv.sqlQuery("SELECT * FROM orcTable"); * } * */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index d740c3f1f99..6fe0a9e3a64 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -656,30 +656,6 @@ abstract class TableEnvironment(val config: TableConfig) { */ def explain(table: Table): String - /** -* Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. -* -* All tables referenced by the query must be registered in the TableEnvironment. -* A [[Table]] is automatically registered when its [[toString]] method is called, for example -* when it is embedded into a String. -* Hence, SQL queries can directly reference a [[Table]] as follows: -* -* {{{ -* val table: Table = ... -* // the table is not registered to the table environment -* tEnv.sql(s"SELECT * FROM $table") -* }}} -* -* @deprecated Use sqlQuery() instead. -* @param query The SQL query to evaluate. -* @return The result of the query as Table. -*/ - @Deprecated - @deprecated("Please use sqlQuery() instead.") - def sql(query: String): Table = { -sqlQuery(query) - } - /** * Evaluates a SQL query on registered tables and retrieves the result as a [[Table]]. * diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala index 2da08faa12e..53fb7a08d47 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/StreamTableEnvironment.scala @@ -133,115 +133,6 @@ class StreamTableEnvironment(
[GitHub] asfgit closed pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
asfgit closed pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md index 146d1a6c1fd..622fe9ffedf 100644 --- a/docs/dev/table/common.md +++ b/docs/dev/table/common.md @@ -46,6 +46,8 @@ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); tableEnv.registerTable("table1", ...)// or tableEnv.registerTableSource("table2", ...); // or tableEnv.registerExternalCatalog("extCat", ...); +// register an output Table +tableEnv.registerTableSink("outputTable", ...); // create a Table from a Table API query Table tapiResult = tableEnv.scan("table1").select(...); @@ -53,7 +55,7 @@ Table tapiResult = tableEnv.scan("table1").select(...); Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... "); // emit a Table API result Table to a TableSink, same for SQL result -tapiResult.writeToSink(...); +tapiResult.insertInto("outputTable"); // execute env.execute(); @@ -72,7 +74,9 @@ val tableEnv = TableEnvironment.getTableEnvironment(env) // register a Table tableEnv.registerTable("table1", ...) // or tableEnv.registerTableSource("table2", ...) // or -tableEnv.registerExternalCatalog("extCat", ...) +tableEnv.registerExternalCatalog("extCat", ...) +// register an output Table +tableEnv.registerTableSink("outputTable", ...); // create a Table from a Table API query val tapiResult = tableEnv.scan("table1").select(...) @@ -80,7 +84,7 @@ val tapiResult = tableEnv.scan("table1").select(...) val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...") // emit a Table API result Table to a TableSink, same for SQL result -tapiResult.writeToSink(...) +tapiResult.insertInto("outputTable") // execute env.execute() @@ -500,10 +504,7 @@ A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Ta Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`. -There are two ways to emit a table: - -1. The `Table.writeToSink(TableSink sink)` method emits the table using the provided `TableSink` and automatically configures the sink with the schema of the table to emit. -2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that was registered with a specific schema under the provided name in the `TableEnvironment`'s catalog. The schema of the table to emit is validated against the schema of the registered `TableSink`. +The `Table.insertInto(String tableName)` method emits the `Table` to a registered `TableSink`. The method looks up the `TableSink` from the catalog by the name and validates that the schema of the `Table` is identical to the schema of the `TableSink`. The following examples shows how to emit a `Table`: @@ -513,22 +514,17 @@ The following examples shows how to emit a `Table`: // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); -// compute a result Table using Table API operators and/or SQL queries -Table result = ... - // create a TableSink TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); -// METHOD 1: -// Emit the result Table to the TableSink via the writeToSink() method -result.writeToSink(sink); - -// METHOD 2: -// Register the TableSink with a specific schema +// register the TableSink with a specific schema String[] fieldNames = {"a", "b", "c"}; TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink); -// Emit the result Table to the registered TableSink via the insertInto() method + +// compute a result Table using Table API operators and/or SQL queries +Table result = ... +// emit the result Table to the registered TableSink result.insertInto("CsvSinkTable"); // execute the program @@ -540,22 +536,18 @@ result.insertInto("CsvSinkTable"); // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) -// compute a result Table using Table API operators and/or SQL queries -val result: Table = ... - // create a TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") -// METHOD 1: -// Emit the result Table to the TableSink via the writeToSink() method -result.writeToSink(sink) - -// METHOD 2: -// Register the TableSink with a specific schema +// register the TableSink with a specific schema val fieldNames: Array[String] = Array("a", "b", "c")
[jira] [Resolved] (FLINK-10532) Broken links in documentation
[ https://issues.apache.org/jira/browse/FLINK-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-10532. -- Resolution: Fixed Fix Version/s: 1.6.2 Fixed in 1.7.0: 383cf887f9936d53d37ba907e03522fb8c88a67d Fixed in 1.6.2: 8601555896b2016804303ce7baa6a8b49dd3b07e > Broken links in documentation > - > > Key: FLINK-10532 > URL: https://issues.apache.org/jira/browse/FLINK-10532 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Timo Walther >Priority: Major > Fix For: 1.7.0, 1.6.2 > > > https://travis-ci.org/apache/flink/builds/440115490#L599 > {code:java} > http://localhost:4000/dev/stream/operators.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/dev/table/streaming/sql.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/dev/table/streaming.html: > Remote file does not exist -- broken link!!!{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648401#comment-16648401 ] ASF GitHub Bot commented on FLINK-10156: fhueske commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#issuecomment-429448411 merging This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10528) Remove deprecated APIs from Table API for Flink 1.7.0
[ https://issues.apache.org/jira/browse/FLINK-10528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648402#comment-16648402 ] ASF GitHub Bot commented on FLINK-10528: fhueske commented on issue #6826: [FLINK-10528] [table] Remove methods that were deprecated in Flink 1.4.0 URL: https://github.com/apache/flink/pull/6826#issuecomment-429448476 merging This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove deprecated APIs from Table API for Flink 1.7.0 > - > > Key: FLINK-10528 > URL: https://issues.apache.org/jira/browse/FLINK-10528 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.7.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > Labels: pull-request-available > > There are a few APIs that have been deprecated for a while (since Flink > 1.4.0) and that could be removed: > * {{TableEnvironment.sql()}} as of FLINK-6442 > * {{StreamTableElnvironment.toDataStream()}} and > {{TableConversions.toDataStream()}} as of FLINK-6543 > * {{Table.limit()}} as of FLINK-7821 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
fhueske commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#issuecomment-429448411 merging This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on issue #6826: [FLINK-10528] [table] Remove methods that were deprecated in Flink 1.4.0
fhueske commented on issue #6826: [FLINK-10528] [table] Remove methods that were deprecated in Flink 1.4.0 URL: https://github.com/apache/flink/pull/6826#issuecomment-429448476 merging This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10541) Removed unused code depends on LocalFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10541: --- Labels: pull-request-available (was: ) > Removed unused code depends on LocalFlinkMiniCluster > > > Key: FLINK-10541 > URL: https://issues.apache.org/jira/browse/FLINK-10541 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {{TestBaseUtils#startCluster}} depends on legacy class > {{LocalFlinkMiniCluster}}, however, this class itself is unused. Let's remove > it and help confirm we can directly remove {{LocalFlinkMiniCluster}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #6835: [FLINK-10541] [tests] Removed unused code depends on LocalFlinkMiniCl…
TisonKun opened a new pull request #6835: [FLINK-10541] [tests] Removed unused code depends on LocalFlinkMiniCl… URL: https://github.com/apache/flink/pull/6835 …uster ## What is the purpose of the change `TestBaseUtils#startCluster` and `TestBaseUtils#stopCluster` depends on legacy class `LocalFlinkMiniCluster`, however, these method are unused. Let's remove them and help confirm we can directly remove LocalFlinkMiniCluster. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**no**) cc @tillrohrmann @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10541) Removed unused code depends on LocalFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648399#comment-16648399 ] ASF GitHub Bot commented on FLINK-10541: TisonKun opened a new pull request #6835: [FLINK-10541] [tests] Removed unused code depends on LocalFlinkMiniCl… URL: https://github.com/apache/flink/pull/6835 …uster ## What is the purpose of the change `TestBaseUtils#startCluster` and `TestBaseUtils#stopCluster` depends on legacy class `LocalFlinkMiniCluster`, however, these method are unused. Let's remove them and help confirm we can directly remove LocalFlinkMiniCluster. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**no**) cc @tillrohrmann @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Removed unused code depends on LocalFlinkMiniCluster > > > Key: FLINK-10541 > URL: https://issues.apache.org/jira/browse/FLINK-10541 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > {{TestBaseUtils#startCluster}} depends on legacy class > {{LocalFlinkMiniCluster}}, however, this class itself is unused. Let's remove > it and help confirm we can directly remove {{LocalFlinkMiniCluster}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10541) Removed unused code depends on LocalFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun updated FLINK-10541: - Summary: Removed unused code depends on LocalFlinkMiniCluster (was: Removed unused code of LocalFlinkMiniCluster) > Removed unused code depends on LocalFlinkMiniCluster > > > Key: FLINK-10541 > URL: https://issues.apache.org/jira/browse/FLINK-10541 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.7.0 > > > {{TestBaseUtils#startCluster}} depends on legacy class > {{LocalFlinkMiniCluster}}, however, this class itself is unused. Let's remove > it and help confirm we can directly remove {{LocalFlinkMiniCluster}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10541) Removed unused code of LocalFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun updated FLINK-10541: - Issue Type: Sub-task (was: Task) Parent: FLINK-10392 > Removed unused code of LocalFlinkMiniCluster > > > Key: FLINK-10541 > URL: https://issues.apache.org/jira/browse/FLINK-10541 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Fix For: 1.7.0 > > > {{TestBaseUtils#startCluster}} depends on legacy class > {{LocalFlinkMiniCluster}}, however, this class itself is unused. Let's remove > it and help confirm we can directly remove {{LocalFlinkMiniCluster}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10541) Removed unused code of LocalFlinkMiniCluster
TisonKun created FLINK-10541: Summary: Removed unused code of LocalFlinkMiniCluster Key: FLINK-10541 URL: https://issues.apache.org/jira/browse/FLINK-10541 Project: Flink Issue Type: Task Components: Tests Affects Versions: 1.7.0 Reporter: TisonKun Assignee: TisonKun Fix For: 1.7.0 {{TestBaseUtils#startCluster}} depends on legacy class {{LocalFlinkMiniCluster}}, however, this class itself is unused. Let's remove it and help confirm we can directly remove {{LocalFlinkMiniCluster}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10540) Remove legacy LocalFlinkMiniCluster
TisonKun created FLINK-10540: Summary: Remove legacy LocalFlinkMiniCluster Key: FLINK-10540 URL: https://issues.apache.org/jira/browse/FLINK-10540 Project: Flink Issue Type: Sub-task Affects Versions: 1.7.0 Reporter: TisonKun Fix For: 1.7.0 {{LocalFlinkMiniCluster}} is based on legacy cluster mode and should be no longer used. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648353#comment-16648353 ] ASF GitHub Bot commented on FLINK-10156: fhueske commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#issuecomment-429437884 Thanks for the review @sunjincheng121! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
fhueske commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#issuecomment-429437884 Thanks for the review @sunjincheng121! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10508) Port JobManagerITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648350#comment-16648350 ] ASF GitHub Bot commented on FLINK-10508: TisonKun commented on issue #6834: [FLINK-10508] [tests] Port JobManagerITCase to new code base URL: https://github.com/apache/flink/pull/6834#issuecomment-429436854 After some commits above, `JobManagerITCase` remains 4 tests which I am not quite sure how to process them. Thus I'd like to cc @tillrohrmann and @tzulitai for advice. 1. `"JobManagerITCase.The JobManager actor must remove execution graphs when the client ends the session explicitly"` and `"JobManagerITCase.The JobManager actor must remove execution graphs when when the client's session times out"`, since `sessionTimeout` looks like not be used anymore in FLIP-6 codebase, and execution graph archive mechanisms different, I tend to remove them. @tillrohrmann 2. `"JobManagerITCase.The JobManager actor must handle trigger savepoint response after trigger savepoint failure"` and `"JobManagerITCase.The JobManager actor must handle failed savepoint triggering"`, these two tests check "message" of trigger savepoint fail. Since FLIP-6 is not directly based on Akka, I don't think it is necessary for these checks, so tend to remove them. However, I want to know if we have tests to cover negative cases of savepoint @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port JobManagerITCase to new code base > -- > > Key: FLINK-10508 > URL: https://issues.apache.org/jira/browse/FLINK-10508 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{JobManagerITCase}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6834: [FLINK-10508] [tests] Port JobManagerITCase to new code base
TisonKun commented on issue #6834: [FLINK-10508] [tests] Port JobManagerITCase to new code base URL: https://github.com/apache/flink/pull/6834#issuecomment-429436854 After some commits above, `JobManagerITCase` remains 4 tests which I am not quite sure how to process them. Thus I'd like to cc @tillrohrmann and @tzulitai for advice. 1. `"JobManagerITCase.The JobManager actor must remove execution graphs when the client ends the session explicitly"` and `"JobManagerITCase.The JobManager actor must remove execution graphs when when the client's session times out"`, since `sessionTimeout` looks like not be used anymore in FLIP-6 codebase, and execution graph archive mechanisms different, I tend to remove them. @tillrohrmann 2. `"JobManagerITCase.The JobManager actor must handle trigger savepoint response after trigger savepoint failure"` and `"JobManagerITCase.The JobManager actor must handle failed savepoint triggering"`, these two tests check "message" of trigger savepoint fail. Since FLIP-6 is not directly based on Akka, I don't think it is necessary for these checks, so tend to remove them. However, I want to know if we have tests to cover negative cases of savepoint @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10532) Broken links in documentation
[ https://issues.apache.org/jira/browse/FLINK-10532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-10532: Assignee: Timo Walther > Broken links in documentation > - > > Key: FLINK-10532 > URL: https://issues.apache.org/jira/browse/FLINK-10532 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Timo Walther >Priority: Major > Fix For: 1.7.0 > > > https://travis-ci.org/apache/flink/builds/440115490#L599 > {code:java} > http://localhost:4000/dev/stream/operators.html: > Remote file does not exist -- broken link!!! > -- > http://localhost:4000/dev/table/streaming/sql.html: > Remote file does not exist -- broken link!!! > http://localhost:4000/dev/table/streaming.html: > Remote file does not exist -- broken link!!!{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648269#comment-16648269 ] ASF GitHub Bot commented on FLINK-10423: sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224876736 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull
[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224876736 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648194#comment-16648194 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224864142 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, +
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224864142 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648180#comment-16648180 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224860246 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, +
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224860246 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648178#comment-16648178 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224860246 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, +
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224860246 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648174#comment-16648174 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224860246 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, +
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648176#comment-16648176 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224860246 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, +
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224860246 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224860246 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224856170 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -606,6 +627,17 @@ private RocksDB openDB( Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), "Not all requested column family handles have been created"); + if (this.metricOptions.isEnabled()) { + this.nativeMetricMonitor = new RocksDBNativeMetricMonitor( + dbRef, + rocksDBResourceGuard, + metricOptions, + operatorMetricGroup + ); + + this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor); Review comment: It is not an issue in the sense of a bug, but it is much more explicit. `CloseableRegistry` is typically used to coordinate closing of resources across multiple threads and I would not use it if we move operate in just one thread. I think you need to close metrics as first thing in dispose() or else it will block. As a rule of thump, all closing in `dispose()`should happen in the reverse order of creation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648152#comment-16648152 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224856170 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -606,6 +627,17 @@ private RocksDB openDB( Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), "Not all requested column family handles have been created"); + if (this.metricOptions.isEnabled()) { + this.nativeMetricMonitor = new RocksDBNativeMetricMonitor( + dbRef, + rocksDBResourceGuard, + metricOptions, + operatorMetricGroup + ); + + this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor); Review comment: It is not an issue in the sense of a bug, but it is much more explicit. `CloseableRegistry` is typically used to coordinate closing of resources across multiple threads and I would not use it if we move operate in just one thread. I think you need to close metrics as first thing in dispose() or else it will block. As a rule of thump, all closing in `dispose()`should happen in the reverse order of creation. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Forward RocksDB native metrics to Flink metrics reporter > - > > Key: FLINK-10423 > URL: https://issues.apache.org/jira/browse/FLINK-10423 > Project: Flink > Issue Type: New Feature > Components: Metrics, State Backends, Checkpointing >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > > RocksDB contains a number of metrics at the column family level about current > memory usage, open memtables, etc that would be useful to users wishing > greater insight what rocksdb is doing. This work is inspired heavily by the > comments on this rocksdb issue thread > (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648147#comment-16648147 ] ASF GitHub Bot commented on FLINK-10423: sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224855111 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull
[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224855111 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648146#comment-16648146 ] ASF GitHub Bot commented on FLINK-10423: sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224854199 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -606,6 +627,17 @@ private RocksDB openDB( Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), "Not all requested column family handles have been created"); + if (this.metricOptions.isEnabled()) { + this.nativeMetricMonitor = new RocksDBNativeMetricMonitor( + dbRef, + rocksDBResourceGuard, + metricOptions, + operatorMetricGroup + ); + + this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor); Review comment: Why is this an issue? The cancelStreamRegistry is closed in `AbstractKeyedStateBackend#dispose`. I manually called close inside of `dispose` initially but I was concerned about the strict ordering it imposed. If the nativeMetricsWatcher wasn't closed first `rocksDBResourceGuard.close()` would block forever. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Forward RocksDB native metrics to Flink metrics reporter > - > > Key: FLINK-10423 > URL: https://issues.apache.org/jira/browse/FLINK-10423 > Project: Flink > Issue Type: New Feature > Components: Metrics, State Backends, Checkpointing >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > > RocksDB contains a number of metrics at the column family level about current > memory usage, open memtables, etc that would be useful to users wishing > greater insight what rocksdb is doing. This work is inspired heavily by the > comments on this rocksdb issue thread > (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648142#comment-16648142 ] ASF GitHub Bot commented on FLINK-10423: sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224855415 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; Review comment: Agreed, I am going to push the lease into the gauge itself. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Forward RocksDB native metrics to Flink metrics reporter > - > > Key: FLINK-10423 > URL: https://issues.apache.org/jira/browse/FLINK-10423 > Project: Flink > Issue Type: New Feature > Components: Metrics, State Backends, Checkpointing >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > > RocksDB contains a number of metrics at the column family level about current > memory usage, open memtables, etc that would be useful to users wishing > greater insight what rocksdb is doing. This work is inspired heavily by the > comments on this rocksdb issue thread > (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224855415 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; Review comment: Agreed, I am going to push the lease into the gauge itself. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224854199 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -606,6 +627,17 @@ private RocksDB openDB( Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), "Not all requested column family handles have been created"); + if (this.metricOptions.isEnabled()) { + this.nativeMetricMonitor = new RocksDBNativeMetricMonitor( + dbRef, + rocksDBResourceGuard, + metricOptions, + operatorMetricGroup + ); + + this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor); Review comment: Why is this an issue? The cancelStreamRegistry is closed in `AbstractKeyedStateBackend#dispose`. I manually called close inside of `dispose` initially but I was concerned about the strict ordering it imposed. If the nativeMetricsWatcher wasn't closed first `rocksDBResourceGuard.close()` would block forever. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648141#comment-16648141 ] ASF GitHub Bot commented on FLINK-10423: sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224855111 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull
[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224855111 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648136#comment-16648136 ] ASF GitHub Bot commented on FLINK-10423: sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224854199 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -606,6 +627,17 @@ private RocksDB openDB( Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), "Not all requested column family handles have been created"); + if (this.metricOptions.isEnabled()) { + this.nativeMetricMonitor = new RocksDBNativeMetricMonitor( + dbRef, + rocksDBResourceGuard, + metricOptions, + operatorMetricGroup + ); + + this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor); Review comment: Why is this an issue? The cancelStreamRegistry is closed in `AbstractKeyedStateBackend#dispose`. I manually called close inside of `dispose` first but I was concerned about the strict ordering it imposed because if the nativeMetricsWatcher wasn't closed first `rocksDBResourceGuard.close()` would block forever. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Forward RocksDB native metrics to Flink metrics reporter > - > > Key: FLINK-10423 > URL: https://issues.apache.org/jira/browse/FLINK-10423 > Project: Flink > Issue Type: New Feature > Components: Metrics, State Backends, Checkpointing >Reporter: Seth Wiesman >Assignee: Seth Wiesman >Priority: Major > Labels: pull-request-available > > RocksDB contains a number of metrics at the column family level about current > memory usage, open memtables, etc that would be useful to users wishing > greater insight what rocksdb is doing. This work is inspired heavily by the > comments on this rocksdb issue thread > (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
sjwiesman commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224854199 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -606,6 +627,17 @@ private RocksDB openDB( Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), "Not all requested column family handles have been created"); + if (this.metricOptions.isEnabled()) { + this.nativeMetricMonitor = new RocksDBNativeMetricMonitor( + dbRef, + rocksDBResourceGuard, + metricOptions, + operatorMetricGroup + ); + + this.cancelStreamRegistry.registerCloseable(nativeMetricMonitor); Review comment: Why is this an issue? The cancelStreamRegistry is closed in `AbstractKeyedStateBackend#dispose`. I manually called close inside of `dispose` first but I was concerned about the strict ordering it imposed because if the nativeMetricsWatcher wasn't closed first `rocksDBResourceGuard.close()` would block forever. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10516) YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
[ https://issues.apache.org/jira/browse/FLINK-10516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648131#comment-16648131 ] ASF GitHub Bot commented on FLINK-10516: yanyan300300 closed pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 462682f7e5f..f3dd27b5ef0 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -67,6 +68,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.concurrent.Callable; @@ -160,6 +162,13 @@ protected int run(String[] args) { final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties); + // configure the filesystems + try { + FileSystem.initialize(flinkConfig); + } catch (IOException e) { + throw new IOException("Error while configuring the filesystems.", e); + } + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); if (remoteKeytabPrincipal != null && f.exists()) { String keytabPath = f.getAbsolutePath(); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java index b15374b2a4b..929dbdbce3b 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java @@ -19,9 +19,12 @@ package org.apache.flink.yarn; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.util.OperatingSystem; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Assume; @@ -29,9 +32,14 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +56,7 @@ import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -56,6 +65,8 @@ /** * Tests for the {@link YarnApplicationMasterRunner}. */ +@PrepareForTest(FileSystem.class) +@RunWith(PowerMockRunner.class) public class YarnApplicationMasterRunnerTest { private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class); @@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { taskManagerConf, workingDirectory, taskManagerMainClass, LOG); assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme()); } + + @Test + public void testRunAndInitializeFileSystem() throws
[GitHub] yanyan300300 closed pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup
yanyan300300 closed pull request #6828: [FLINK-10516] [yarn] fix YarnApplicationMasterRunner fail to initialize FileSystem with correct Flink Configuration during setup URL: https://github.com/apache/flink/pull/6828 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 462682f7e5f..f3dd27b5ef0 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.ResourceManagerOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; @@ -67,6 +68,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.concurrent.Callable; @@ -160,6 +162,13 @@ protected int run(String[] args) { final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties); + // configure the filesystems + try { + FileSystem.initialize(flinkConfig); + } catch (IOException e) { + throw new IOException("Error while configuring the filesystems.", e); + } + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); if (remoteKeytabPrincipal != null && f.exists()) { String keytabPath = f.getAbsolutePath(); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java index b15374b2a4b..929dbdbce3b 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java @@ -19,9 +19,12 @@ package org.apache.flink.yarn; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.util.OperatingSystem; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Assume; @@ -29,9 +32,14 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +56,7 @@ import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -56,6 +65,8 @@ /** * Tests for the {@link YarnApplicationMasterRunner}. */ +@PrepareForTest(FileSystem.class) +@RunWith(PowerMockRunner.class) public class YarnApplicationMasterRunnerTest { private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class); @@ -109,4 +120,37 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { taskManagerConf, workingDirectory, taskManagerMainClass, LOG); assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme()); } + + @Test + public void testRunAndInitializeFileSystem() throws Exception { + // Mock necessary system variables + Map map = new HashMap(System.getenv()); + map.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo"); + // Create dynamic properties to be used
[GitHub] sunjincheng121 commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#issuecomment-429392078 Thanks for the update @fhueske ! The current change LGTM. +1 to merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648120#comment-16648120 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on issue #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#issuecomment-429392078 Thanks for the update @fhueske ! The current change LGTM. +1 to merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224851062 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -944,7 +956,12 @@ class Table( * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written. */ def insertInto(tableName: String): Unit = { -tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) +this.logicalPlan match { + case _: LogicalTableFunctionCall => +throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.") + case _ => +tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) Review comment: if we check NULL, we should throw exception, but I think your right. I agree keep the current approach . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648116#comment-16648116 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224851062 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -944,7 +956,12 @@ class Table( * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written. */ def insertInto(tableName: String): Unit = { -tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) +this.logicalPlan match { + case _: LogicalTableFunctionCall => +throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.") + case _ => +tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) Review comment: if we check NULL, we should throw exception, but I think your right. I agree keep the current approach . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10398) Add Tanh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648092#comment-16648092 ] ASF GitHub Bot commented on FLINK-10398: yanghua commented on a change in pull request #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6736#discussion_r224845010 ## File path: docs/dev/table/functions.md ## @@ -1219,6 +1219,18 @@ TAN(numeric) + + +{% highlight text %} +TANH(numeric) +{% endhighlight %} + + +Returns the hyperbolic tangent of NUMERIC. +Return value type is DOUBLE. Review comment: In fact, I just followed the [same pattern](https://github.com/apache/flink/pull/6700/files#diff-51c7d3bf736c60e8b942791a3715cbffR1284) as the PR of cosh, which looks good to Piotr, and according to Piotr's advice: [the type is best written uniformly](https://github.com/apache/flink/pull/6700#discussion_r220892667), and you haven't objected to this point before. But here you suggest I change it to lowercase. Your guidelines seem vague. However, there is also my problem. I should have made these functions in the same PR to make the change more effective. There are a few points that I probably didn't pay much attention to, but I think many of the points you mentioned earlier are about documentation. Document description, however, has no definite criteria, and may be what you think is standard, but another person disagrees. So it caused a lot of reciprocating. Plus, if you're willing to review, I think it's more contributor-friendly for you to give a definite review conclusion once (except for new problems arising from secondary changes). I feel that each time you give a partial review suggestion (some of which are not related to my last change), it also creates more reciprocity. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Tanh math function supported in Table API and SQL > - > > Key: FLINK-10398 > URL: https://issues.apache.org/jira/browse/FLINK-10398 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to : https://www.techonthenet.com/oracle/functions/tanh.php -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL
yanghua commented on a change in pull request #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6736#discussion_r224845010 ## File path: docs/dev/table/functions.md ## @@ -1219,6 +1219,18 @@ TAN(numeric) + + +{% highlight text %} +TANH(numeric) +{% endhighlight %} + + +Returns the hyperbolic tangent of NUMERIC. +Return value type is DOUBLE. Review comment: In fact, I just followed the [same pattern](https://github.com/apache/flink/pull/6700/files#diff-51c7d3bf736c60e8b942791a3715cbffR1284) as the PR of cosh, which looks good to Piotr, and according to Piotr's advice: [the type is best written uniformly](https://github.com/apache/flink/pull/6700#discussion_r220892667), and you haven't objected to this point before. But here you suggest I change it to lowercase. Your guidelines seem vague. However, there is also my problem. I should have made these functions in the same PR to make the change more effective. There are a few points that I probably didn't pay much attention to, but I think many of the points you mentioned earlier are about documentation. Document description, however, has no definite criteria, and may be what you think is standard, but another person disagrees. So it caused a lot of reciprocating. Plus, if you're willing to review, I think it's more contributor-friendly for you to give a definite review conclusion once (except for new problems arising from secondary changes). I feel that each time you give a partial review suggestion (some of which are not related to my last change), it also creates more reciprocity. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10508) Port JobManagerITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648089#comment-16648089 ] ASF GitHub Bot commented on FLINK-10508: TisonKun opened a new pull request #6834: [WIP] [FLINK-10508] [tests] Port JobManagerITCase to new code base URL: https://github.com/apache/flink/pull/6834 ## What is the purpose of the change Port `JobManagerITCase` to new code base ## Brief change log Technically legacy `JobManagerITCase` tests a series of job executions(different job, success or fail), by starting a `TestingCluster`. Thus I think the most convenient way to port this test is using `MiniCluster` and run ITCase on it. Follow this thought I port the test to `MiniClusterITCase` ## Verifying this change Changes themselves are tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**no**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Port JobManagerITCase to new code base > -- > > Key: FLINK-10508 > URL: https://issues.apache.org/jira/browse/FLINK-10508 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{JobManagerITCase}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10508) Port JobManagerITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10508: --- Labels: pull-request-available (was: ) > Port JobManagerITCase to new code base > -- > > Key: FLINK-10508 > URL: https://issues.apache.org/jira/browse/FLINK-10508 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: TisonKun >Assignee: TisonKun >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Port {{JobManagerITCase}} to new code base. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun opened a new pull request #6834: [WIP] [FLINK-10508] [tests] Port JobManagerITCase to new code base
TisonKun opened a new pull request #6834: [WIP] [FLINK-10508] [tests] Port JobManagerITCase to new code base URL: https://github.com/apache/flink/pull/6834 ## What is the purpose of the change Port `JobManagerITCase` to new code base ## Brief change log Technically legacy `JobManagerITCase` tests a series of job executions(different job, success or fail), by starting a `TestingCluster`. Thus I think the most convenient way to port this test is using `MiniCluster` and run ITCase on it. Follow this thought I port the test to `MiniClusterITCase` ## Verifying this change Changes themselves are tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**no**) - The serializers: (**no**) - The runtime per-record code paths (performance sensitive): (**no**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**no**) - The S3 file system connector: (**no**) ## Documentation - Does this pull request introduce a new feature? (**no**) - If yes, how is the feature documented? (**no**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9697) Provide connector for Kafka 2.0.0
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648078#comment-16648078 ] ASF GitHub Bot commented on FLINK-9697: --- yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0 URL: https://github.com/apache/flink/pull/6703#issuecomment-429376594 @aljoscha OK, I will refactor the commits and commit messages. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for Kafka 2.0.0 > - > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0
yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0 URL: https://github.com/apache/flink/pull/6703#issuecomment-429376594 @aljoscha OK, I will refactor the commits and commit messages. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9697) Provide connector for Kafka 2.0.0
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648072#comment-16648072 ] ASF GitHub Bot commented on FLINK-9697: --- aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0 URL: https://github.com/apache/flink/pull/6703#issuecomment-429374391 @yanghua Thanks for iterating on this so quickly! I will do a last thorough review next week and then hopefully merge. To make the reviewing easier you could restructure the commits a bit. All prerequisite work should go into isolated commits and then the last commit should only have the new files for the Kafka 2.0 connector. For example the dependency change would be separate, or renaming the `KafkaTableSource` to `KafkaTableSourceBase` can be separate to more clearly see what each change does. This makes it way easier in the feature when someone looks at the changes and tries to figure out what was going on. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for Kafka 2.0.0 > - > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10384) Add Sinh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648069#comment-16648069 ] ASF GitHub Bot commented on FLINK-10384: xccui commented on a change in pull request #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6730#discussion_r224835409 ## File path: docs/dev/table/functions.md ## @@ -1197,6 +1197,18 @@ SIN(numeric) + + +{% highlight text %} +SINH(numeric) +{% endhighlight %} + + +Returns the hyperbolic sine of NUMERIC. +Return value type is DOUBLE. Review comment: Change `NUMERIC` to lowercase. The return type is ... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Sinh math function supported in Table API and SQL > - > > Key: FLINK-10384 > URL: https://issues.apache.org/jira/browse/FLINK-10384 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > like FLINK-10340 for adding Cosh math function -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10398) Add Tanh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648070#comment-16648070 ] ASF GitHub Bot commented on FLINK-10398: xccui commented on a change in pull request #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6736#discussion_r224835500 ## File path: docs/dev/table/functions.md ## @@ -1219,6 +1219,18 @@ TAN(numeric) + + +{% highlight text %} +TANH(numeric) +{% endhighlight %} + + +Returns the hyperbolic tangent of NUMERIC. +Return value type is DOUBLE. Review comment: Change the `NUMERIC` to lowercase here. The return type is ... Please ALWAYS be careful with these minor issues cause they have been pointed out many times. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Tanh math function supported in Table API and SQL > - > > Key: FLINK-10398 > URL: https://issues.apache.org/jira/browse/FLINK-10398 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: vinoyang >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > refer to : https://www.techonthenet.com/oracle/functions/tanh.php -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] xccui commented on a change in pull request #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL
xccui commented on a change in pull request #6736: [FLINK-10398][table] Add Tanh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6736#discussion_r224835500 ## File path: docs/dev/table/functions.md ## @@ -1219,6 +1219,18 @@ TAN(numeric) + + +{% highlight text %} +TANH(numeric) +{% endhighlight %} + + +Returns the hyperbolic tangent of NUMERIC. +Return value type is DOUBLE. Review comment: Change the `NUMERIC` to lowercase here. The return type is ... Please ALWAYS be careful with these minor issues cause they have been pointed out many times. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0
aljoscha commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0 URL: https://github.com/apache/flink/pull/6703#issuecomment-429374391 @yanghua Thanks for iterating on this so quickly! I will do a last thorough review next week and then hopefully merge. To make the reviewing easier you could restructure the commits a bit. All prerequisite work should go into isolated commits and then the last commit should only have the new files for the Kafka 2.0 connector. For example the dependency change would be separate, or renaming the `KafkaTableSource` to `KafkaTableSourceBase` can be separate to more clearly see what each change does. This makes it way easier in the feature when someone looks at the changes and tries to figure out what was going on. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] xccui commented on a change in pull request #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL
xccui commented on a change in pull request #6730: [FLINK-10384][table] Add Sinh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6730#discussion_r224835409 ## File path: docs/dev/table/functions.md ## @@ -1197,6 +1197,18 @@ SIN(numeric) + + +{% highlight text %} +SINH(numeric) +{% endhighlight %} + + +Returns the hyperbolic sine of NUMERIC. +Return value type is DOUBLE. Review comment: Change `NUMERIC` to lowercase. The return type is ... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648049#comment-16648049 ] ASF GitHub Bot commented on FLINK-10156: fhueske commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224831240 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -944,7 +956,12 @@ class Table( * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written. */ def insertInto(tableName: String): Unit = { -tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) +this.logicalPlan match { + case _: LogicalTableFunctionCall => +throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.") + case _ => +tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) Review comment: The check is basically replacing the null check but provides more context. What would we do if `tableEnv` is `null`? Do nothing? Throw an exception? What would be the error message? By checking for the type of the root node of the logical plan we know why the call would not succeed and can give an appropriate error message. We could of course also pass `null` as configuration and check for it to be null in `insertInto(String, QueryConfig)` but I'd consider that as an invalid use of the interface that should also result in an exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
fhueske commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224831240 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -944,7 +956,12 @@ class Table( * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written. */ def insertInto(tableName: String): Unit = { -tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) +this.logicalPlan match { + case _: LogicalTableFunctionCall => +throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.") + case _ => +tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) Review comment: The check is basically replacing the null check but provides more context. What would we do if `tableEnv` is `null`? Do nothing? Throw an exception? What would be the error message? By checking for the type of the root node of the logical plan we know why the call would not succeed and can give an appropriate error message. We could of course also pass `null` as configuration and check for it to be null in `insertInto(String, QueryConfig)` but I'd consider that as an invalid use of the interface that should also result in an exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648041#comment-16648041 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224828122 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala ## @@ -177,10 +195,16 @@ class TableSinkITCase extends AbstractTestBase { .assignAscendingTimestamps(_._1.toLong) .toTable(tEnv, 'id, 'num, 'text) +tEnv.registerTableSink( + "retractSink", + new TestRetractSink().configure( +Array[String]("len", "icnt", "nsum"), +Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG))) + t.select('id, 'num, 'text.charLength() as 'len) .groupBy('len) - .select('len, 'id.count, 'num.sum) - .writeToSink(new TestRetractSink) + .select('len, 'id.count as 'icnt, 'num.sum as 'nsum) Review comment: Sounds good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10156) Drop the Table.writeToSink() method
[ https://issues.apache.org/jira/browse/FLINK-10156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648039#comment-16648039 ] ASF GitHub Bot commented on FLINK-10156: sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224827916 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -944,7 +956,12 @@ class Table( * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written. */ def insertInto(tableName: String): Unit = { -tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) +this.logicalPlan match { + case _: LogicalTableFunctionCall => +throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.") + case _ => +tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) Review comment: I feel like that put same code in the two `overload` method is not the best way. We should check the NULL of `tableEnv` Before we call `insertInto(tableName, this.tableEnv.queryConfig)`, It's that make sense to you? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Drop the Table.writeToSink() method > --- > > Key: FLINK-10156 > URL: https://issues.apache.org/jira/browse/FLINK-10156 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Major > Labels: pull-request-available > > I am proposing to drop the {{Table.writeToSink()}} method. > > *What is the method doing?* > The {{Table.writeToSink(TableSink)}} method emits a {{Table}} via a > {{TableSink}}, for example to a Kafka topic, a file, or a database. > > *Why should it be removed?* > The {{writeToSink()}} method was introduced before the Table API supported > the {{Table.insertInto(String)}} method. The {{insertInto()}} method writes a > table into a table that was previously registered with a {{TableSink}} in the > catalog. It is the inverse method to the {{scan()}} method and the equivalent > to an {{INSERT INTO ... SELECT}} SQL query. > > I think we should remove {{writeToSink()}} for the following reasons: > 1. It offers the same functionality as {{insertInto()}}. Removing it would > reduce duplicated API. > 2. {{writeToSink()}} requires a {{TableSink}} instance. I think TableSinks > (and TableSources) should only be registered with the {{TableEnvironment}} > and not be exposed to the "query part" of the Table API / SQL. > 3. Registering tables in a catalog and using them for input and output is > more aligned with SQL. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9697) Provide connector for Kafka 2.0.0
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648042#comment-16648042 ] ASF GitHub Bot commented on FLINK-9697: --- yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0 URL: https://github.com/apache/flink/pull/6703#issuecomment-429367779 @aljoscha compile and test successfully, what I should do next? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for Kafka 2.0.0 > - > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0
yanghua commented on issue #6703: [FLINK-9697] Provide connector for Kafka 2.0.0 URL: https://github.com/apache/flink/pull/6703#issuecomment-429367779 @aljoscha compile and test successfully, what I should do next? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224828122 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala ## @@ -177,10 +195,16 @@ class TableSinkITCase extends AbstractTestBase { .assignAscendingTimestamps(_._1.toLong) .toTable(tEnv, 'id, 'num, 'text) +tEnv.registerTableSink( + "retractSink", + new TestRetractSink().configure( +Array[String]("len", "icnt", "nsum"), +Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG))) + t.select('id, 'num, 'text.charLength() as 'len) .groupBy('len) - .select('len, 'id.count, 'num.sum) - .writeToSink(new TestRetractSink) + .select('len, 'id.count as 'icnt, 'num.sum as 'nsum) Review comment: Sounds good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink()
sunjincheng121 commented on a change in pull request #6805: [FLINK-10156][table] Deprecate Table.writeToSink() URL: https://github.com/apache/flink/pull/6805#discussion_r224827916 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ## @@ -944,7 +956,12 @@ class Table( * @param tableName Name of the registered [[TableSink]] to which the [[Table]] is written. */ def insertInto(tableName: String): Unit = { -tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) +this.logicalPlan match { + case _: LogicalTableFunctionCall => +throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.") + case _ => +tableEnv.insertInto(this, tableName, this.tableEnv.queryConfig) Review comment: I feel like that put same code in the two `overload` method is not the best way. We should check the NULL of `tableEnv` Before we call `insertInto(tableName, this.tableEnv.queryConfig)`, It's that make sense to you? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647998#comment-16647998 ] ASF GitHub Bot commented on FLINK-10423: StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224817432 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, +
[GitHub] StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
StefanRRichter commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224817432 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[jira] [Commented] (FLINK-10423) Forward RocksDB native metrics to Flink metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647990#comment-16647990 ] ASF GitHub Bot commented on FLINK-10423: zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224761096 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull
[GitHub] zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor
zentol commented on a change in pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb native metrics monitor URL: https://github.com/apache/flink/pull/6814#discussion_r224761096 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.ResourceGuard; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A monitor which pull {{@link RocksDB}} native metrics + * and forwards them to Flink's metric group. All metrics are + * unsigned longs and are reported at the column family level. + */ +@Internal +public class RocksDBNativeMetricMonitor implements Closeable { + + private final CloseableRegistry registeredGauges; + + private final RocksDB db; + + private final ResourceGuard.Lease lease; + + private final RocksDBNativeMetricOptions options; + + private final MetricGroup metricGroup; + + RocksDBNativeMetricMonitor( + @Nonnull RocksDB db, + @Nonnull ResourceGuard guard, + @Nonnull RocksDBNativeMetricOptions options, + @Nonnull MetricGroup metricGroup + ) throws IOException { + this.db = db; + this.lease = guard.acquireResource(); + this.options = options; + this.metricGroup = metricGroup; + + this.registeredGauges = new CloseableRegistry(); + } + + /** +* Register gauges to pull native metrics for the column family. +* @param columnFamilyName group name for the new gauges +* @param handle native handle to the column family +*/ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + try { + MetricGroup group = metricGroup.addGroup(columnFamilyName); + + for (String property : options.getProperties()) { + RocksDBNativeMetricView gauge = new RocksDBNativeMetricView( + property, + handle, + db + ); + + group.gauge(property, gauge); + registeredGauges.registerCloseable(gauge); + } + } catch (IOException e) { + throw new FlinkRuntimeException("Unable to register native metrics with RocksDB", e); + } + } + + @Override + public void close() { + IOUtils.closeQuietly(registeredGauges); + IOUtils.closeQuietly(lease); + } + + static class RocksDBNativeMetricView implements Gauge, View, Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBNativeMetricView.class); + + private final String property; + + private final ColumnFamilyHandle handle; + + private final RocksDB db; + + private volatile boolean open; + + private long value; + + private RocksDBNativeMetricView( + @Nonnull String property, + @Nonnull ColumnFamilyHandle handle, + @Nonnull RocksDB db + ) { + this.property = property; + this.handle = handle; + this.db = db; +
[GitHub] TisonKun closed pull request #6830: [hotfix] [jobmaster] refactor suspendExecutionGraph and clearExecutio…
TisonKun closed pull request #6830: [hotfix] [jobmaster] refactor suspendExecutionGraph and clearExecutio… URL: https://github.com/apache/flink/pull/6830 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 31c6a97124b..f8d8362274d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1193,21 +1193,20 @@ private void suspendAndClearExecutionGraphFields(Exception cause) { private void suspendExecutionGraph(Exception cause) { executionGraph.suspend(cause); + } + private void clearExecutionGraphFields() { if (jobManagerJobMetricGroup != null) { jobManagerJobMetricGroup.close(); + jobManagerJobMetricGroup = null; } if (jobStatusListener != null) { jobStatusListener.stop(); + jobStatusListener = null; } } - private void clearExecutionGraphFields() { - jobManagerJobMetricGroup = null; - jobStatusListener = null; - } - /** * Dispose the savepoint stored under the given path. * This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #6830: [hotfix] [jobmaster] refactor suspendExecutionGraph and clearExecutio…
TisonKun commented on issue #6830: [hotfix] [jobmaster] refactor suspendExecutionGraph and clearExecutio… URL: https://github.com/apache/flink/pull/6830#issuecomment-429349183 @tillrohrmann ok, could you take over it and add as a hotfix of that branch? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10534) Add idle timeout for a flink session cluster
[ https://issues.apache.org/jira/browse/FLINK-10534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647972#comment-16647972 ] ouyangzhe commented on FLINK-10534: --- Thanks [~till.rohrmann] , The flink session cluster will be created by two cases: 1. Flink sessions started by user through yarn-session.sh 2. User runs flink programs in interactive mode, firstly a flink session cluster will be started, the cluster may be shut down if user programs finished properly, but if user main program crashed or killed. the session cluster maybe residual. so I propose to add an idle timeout configuration, let user have the ability to handle this problem. > Add idle timeout for a flink session cluster > > > Key: FLINK-10534 > URL: https://issues.apache.org/jira/browse/FLINK-10534 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Affects Versions: 1.7.0 >Reporter: ouyangzhe >Assignee: ouyangzhe >Priority: Major > Attachments: 屏幕快照 2018-10-12 上午10.24.08.png > > > The flink session cluster on yarn will aways be running while has no jobs > running at all, it will occupy the yarn resources for no use. > Taskmanagers will be released after an idle timeout, but jobmanager will keep > running. > I propose to add a configuration to limit the idle timeout for jobmanager > too, if no job running after a specified timeout, the flink cluster auto > finish itself. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fhueske commented on a change in pull request #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with VALUES
fhueske commented on a change in pull request #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with VALUES URL: https://github.com/apache/flink/pull/6792#discussion_r224801724 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/ConvertToNotInOrInRule.scala ## @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.common + +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil} +import org.apache.calcite.rel.core.Filter +import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode} +import org.apache.calcite.sql.SqlBinaryOperator +import org.apache.calcite.sql.fun.SqlStdOperatorTable.{IN, NOT_IN, EQUALS, NOT_EQUALS, AND, OR} +import org.apache.calcite.tools.RelBuilder + +import scala.collection.JavaConversions._ +import scala.collection.mutable + +/** + * Rule for converting a cascade of predicates to [[IN]] or [[NOT_IN]]. + * + * For example, convert predicate: (x = 1 OR x = 2 OR x = 3) AND y = 4 to + * predicate: x IN (1, 2, 3) AND y = 4. + * + * @param fromOperator The fromOperator, for example, when convert to [[IN]], fromOperator is + * [[EQUALS]]. We convert a cascade of [[EQUALS]] to [[IN]]. + * @param connectOperator The connect operator to connect the fromOperator. + * @param composedOperator The composed operator that may contains sub [[IN]] or [[NOT_IN]]. + * @param toOperator The toOperator, for example, when convert to [[IN]], toOperator is + * [[IN]]. We convert a cascade of [[EQUALS]] to [[IN]]. + * @param description The description of the rule. + */ +class ConvertToNotInOrInRule( +fromOperator: SqlBinaryOperator, Review comment: Only pass the `toOperator` parameter and set all other parameters appropriately? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10474) Don't translate IN with Literals to JOIN with VALUES for streaming queries
[ https://issues.apache.org/jira/browse/FLINK-10474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16647957#comment-16647957 ] ASF GitHub Bot commented on FLINK-10474: fhueske commented on a change in pull request #6792: [FLINK-10474][table] Don't translate IN/NOT_IN to JOIN with VALUES URL: https://github.com/apache/flink/pull/6792#discussion_r224778582 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala ## @@ -351,4 +351,43 @@ class CalcITCase extends AbstractTestBase { "{9=Comment#3}") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testIn(): Unit = { Review comment: Same as for the streaming SQL test. Merge with an existing method to keep the overhead low. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Don't translate IN with Literals to JOIN with VALUES for streaming queries > -- > > Key: FLINK-10474 > URL: https://issues.apache.org/jira/browse/FLINK-10474 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.6.1, 1.7.0 >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > > IN predicates with literals are translated to JOIN with VALUES if the number > of elements in the IN clause exceeds a certain threshold. This should not be > done, because a streaming join is very heavy and materializes both inputs > (which is fine for the VALUES) input but not for the other. > There are two ways to solve this: > # don't translate IN to a JOIN at all > # translate it to a JOIN but have a special join strategy if one input is > bound and final (non-updating) > Option 1. should be easy to do, option 2. requires much more effort. -- This message was sent by Atlassian JIRA (v7.6.3#76005)