[GitHub] [flink] dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r280663142 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -215,6 +218,74 @@ abstract class TableEnvironment(val config: TableConfig) { rels.map(_.asInstanceOf[ExecNode[_, _]]) } + /** +* Register an [[ReadableCatalog]] under a unique name. +* +* @param name the name under which the catalog will be registered +* @param catalog the catalog to register +* @throws CatalogAlreadyExistsException thrown if the catalog already exists +*/ + @throws[CatalogAlreadyExistsException] + def registerCatalog(name: String, catalog: ReadableCatalog): Unit = { +catalogManager.registerCatalog(name, catalog) + } + + /** +* Get a registered [[ReadableCatalog]]. +* +* @param catalogName name of the catalog to get +* @return the requested catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +*/ + @throws[CatalogNotExistException] + def getCatalog(catalogName: String): ReadableCatalog = { +catalogManager.getCatalog(catalogName) + } + + /** +* Get the current catalog. +* +* @return the current catalog in CatalogManager +*/ + def getCurrentCatalog(): ReadableCatalog = { +catalogManager.getCurrentCatalog + } + + /** +* Get the current database name. +* +* @return the current database of the current catalog +*/ + def getCurrentDatabaseName(): String = { +catalogManager.getCurrentCatalog.getCurrentDatabase + } + + /** +* Set the current catalog. +* +* @param name name of the catalog to set as current catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +*/ + @throws[CatalogNotExistException] + def setCurrentCatalog(name: String): Unit = { +catalogManager.setCurrentCatalog(name) + } + + /** +* Set the current catalog and current database. +* +* @param catalogName name of the catalog to set as current catalog +* @param databaseName name of the database to set as current database +* @throws CatalogNotExistException thrown if the catalog doesn't exist +* @throws DatabaseNotExistException thrown if the database doesn't exist +*/ + @throws[CatalogNotExistException] + @throws[DatabaseNotExistException] + def setCurrentDatabase(catalogName: String, databaseName: String): Unit = { +catalogManager.setCurrentCatalog(catalogName) +catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName) Review comment: There can only be a single current database per session, not a one current dB per catalog. So we don't need a `Map` in `CatalogManager`. Have you checked what SQL standard says about it? As I understand it and checked it there is always a default prefix per session (either catalog or catalog + schema - equivalent to DB), so a current DB is not set per catalog. What if the same catalog is used by multiple users for different sessions? How do you retrieve the original default DB for a new session? As I said originally, this is a property of a session and I strongly believe it should not be stored in the catalog. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog
flinkbot commented on issue #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8337#issuecomment-488921461 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 opened a new pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog
bowenli86 opened a new pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8337 ## What is the purpose of the change This PR creates HiveCatalog in flink-connector-hive module and implements database related APIs for HiveCatalog. ## Brief change log - Created HiveCatalog and HiveCatalogDatabase - Abstracted common part between HiveCatalog and GenericHiveMetastoreCatalog to HiveCatalogBase ## Verifying this change This change added tests and can be verified as follows: - Added tests HiveCatalogTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) Documentation will be added later This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12232) Support database related operations in HiveCatalog
[ https://issues.apache.org/jira/browse/FLINK-12232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12232: --- Labels: pull-request-available (was: ) > Support database related operations in HiveCatalog > -- > > Key: FLINK-12232 > URL: https://issues.apache.org/jira/browse/FLINK-12232 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > > Support database related operations in HiveCatalog, which implements > ReadableWritableCatalog API -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280604389 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -229,31 +231,97 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno @Override public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + client.dropTable( + tablePath.getDatabaseName(), + tablePath.getObjectName(), + // Indicate whether associated data should be deleted. + // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary Review comment: yeah, later when necessary. Along with 'purge' This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12395) Reconcile description and detailed description in CatalogBaseTable
[ https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-12395: - Description: CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} API, which don't seem to make such sense. I'm not sure what's the use case of detailed description, and how should users specify it in SQL and table api. Probably should remove detailed description and rename "description" to "comment". Besides, for simplicity, we should consider just treating "comment" as a property in properties map. was: CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} API, which don't seem to make such sense. I'm not sure what's the use case of detailed description, and how should users specify it in SQL and table api. Probably should remove detailed description and treat description as comment. > Reconcile description and detailed description in CatalogBaseTable > -- > > Key: FLINK-12395 > URL: https://issues.apache.org/jira/browse/FLINK-12395 > Project: Flink > Issue Type: Sub-task >Reporter: Bowen Li >Priority: Major > Fix For: 1.9.0 > > > CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} > API, which don't seem to make such sense. I'm not sure what's the use case of > detailed description, and how should users specify it in SQL and table api. > Probably should remove detailed description and rename "description" to > "comment". > Besides, for simplicity, we should consider just treating "comment" as a > property in properties map. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280606701 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java ## @@ -41,28 +41,28 @@ private final List partitionKeys; // Properties of the table private final Map properties; - // Comment of the table - private String comment = "This is a generic catalog table."; + // Description of the table + private String description = "This is a generic catalog table."; Review comment: reverted. https://issues.apache.org/jira/browse/FLINK-12395 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280606701 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java ## @@ -41,28 +41,28 @@ private final List partitionKeys; // Properties of the table private final Map properties; - // Comment of the table - private String comment = "This is a generic catalog table."; + // Description of the table + private String description = "This is a generic catalog table."; Review comment: reverted https://issues.apache.org/jira/browse/FLINK-12395 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12395) Reconcile description and detailed description in CatalogBaseTable
Bowen Li created FLINK-12395: Summary: Reconcile description and detailed description in CatalogBaseTable Key: FLINK-12395 URL: https://issues.apache.org/jira/browse/FLINK-12395 Project: Flink Issue Type: Sub-task Reporter: Bowen Li Fix For: 1.9.0 CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} API, which don't seem to make such sense. I'm not sure what's the use case of detailed description, and how should users specify it in SQL and table api. Probably should remove detailed description and treat description as comment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280604389 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -229,31 +231,97 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno @Override public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + client.dropTable( + tablePath.getDatabaseName(), + tablePath.getObjectName(), + // Indicate whether associated data should be deleted. + // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary Review comment: agree, along with 'purge' This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r280525021 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FlinkCatalogManager.java ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.util.StringUtils; + +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.schema.SchemaPlus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager implementation for Flink. + * TODO: [FLINK-11275] Decouple CatalogManager with Calcite + * Idealy FlinkCatalogManager should be in flink-table-api-java module. + * But due to that it currently depends on Calcite, a dependency that flink-table-api-java doesn't have right now. + * We temporarily put FlinkCatalogManager in flink-table-planner-blink. + */ +public class FlinkCatalogManager implements CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalogManager.class); + + public static final String BUILTIN_CATALOG_NAME = "builtin"; + + // The catalog to hold all registered and translated tables + // We disable caching here to prevent side effects + private CalciteSchema internalSchema = CalciteSchema.createRootSchema(false, false); + private SchemaPlus rootSchema = internalSchema.plus(); + + // A map between names and catalogs. + private Map catalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + public FlinkCatalogManager() { + LOG.info("Initializing FlinkCatalogManager"); + catalogs = new HashMap<>(); + + GenericInMemoryCatalog inMemoryCatalog = new GenericInMemoryCatalog(BUILTIN_CATALOG_NAME); Review comment: Good question, this PR only partially reflects the integration of `CatalogManager` and `TableEnv`. We propose adding new APIs to `TableEnv` to read/write catalog metadata (which will be a different set of JIRAs). If users try to write to a `ReadableCatalog`, a catalog exception will be thrown to remind them that. E.g. for CatalogBaseTable, we'll have ``` // can have overloaded methods or a different signature to register table in a catalog different than the current one void registerTable(String name, CatalogBaseTable table, boolean ignoreIfExists) { if (!(catalogManager.getCurrentCatalog() instanceof ReadableWritableCatalog)) { throw CatalogException("xxx is a ReadableCatalog only") } catalog = (ReadableWritableCatalog) catalogManager.getCurrentCatalog() catalog.createTable(new ObjectPath(catalog.getCurrentDb(), name), table, ignoreIfExists) } ``` `ReadableCatalog` and `ReadableWritableCatalog` is inherited from the original `ExternalCatalog` and `CrudExternalCatalog`, more context in [FLIP-30](https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs). I believe it's good to have `ReadableCatalog`. It sometimes doesn't make sense or just cannot support writing in some catalog impl. E.g. at least for now I don't see Confluent Registry catalog ( [FLINK-12256](https://issues.apache.org/jira/browse/FLINK-12256)) needs to support writing since writing thru Confluent Registry itself and its fancy UI is much more convenient and user friendly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please
[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r280525021 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FlinkCatalogManager.java ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.util.StringUtils; + +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.schema.SchemaPlus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager implementation for Flink. + * TODO: [FLINK-11275] Decouple CatalogManager with Calcite + * Idealy FlinkCatalogManager should be in flink-table-api-java module. + * But due to that it currently depends on Calcite, a dependency that flink-table-api-java doesn't have right now. + * We temporarily put FlinkCatalogManager in flink-table-planner-blink. + */ +public class FlinkCatalogManager implements CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalogManager.class); + + public static final String BUILTIN_CATALOG_NAME = "builtin"; + + // The catalog to hold all registered and translated tables + // We disable caching here to prevent side effects + private CalciteSchema internalSchema = CalciteSchema.createRootSchema(false, false); + private SchemaPlus rootSchema = internalSchema.plus(); + + // A map between names and catalogs. + private Map catalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + public FlinkCatalogManager() { + LOG.info("Initializing FlinkCatalogManager"); + catalogs = new HashMap<>(); + + GenericInMemoryCatalog inMemoryCatalog = new GenericInMemoryCatalog(BUILTIN_CATALOG_NAME); Review comment: Good question, this PR only partially reflects the integration of `CatalogManager` and `TableEnv`. We propose adding new APIs to `TableEnv` to read/write catalog metadata (which will be a different set of JIRAs). If users try to write to a `ReadableCatalog`, a catalog exception will be thrown to remind them that. E.g. for CatalogBaseTable, we'll have ``` // can have overloaded methods or a different signature to register table in a catalog different than the current one void registerTable(String name, CatalogBaseTable table, boolean ignoreIfExists) { if (!(catalogManager.getCurrentCatalog() instanceof ReadableWritableCatalog)) { throw CatalogException("xxx is a ReadableCatalog only") } catalog = (ReadableWritableCatalog) catalogManager.getCurrentCatalog() catalog.createTable(new ObjectPath(catalog.getCurrentDb(), name), table, ignoreIfExists) } ``` `ReadableCatalog` and `ReadableWritableCatalog` is inherited from the original `ExternalCatalog` and `CrudExternalCatalog`, more context in [FLIP-30](https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs). I believe it's good to have `ReadableCatalog`. It doesn't make sense or just cannot support writing in some catalog impl. E.g. at least for now I don't see Confluent Registry catalog ( [FLINK-12256](https://issues.apache.org/jira/browse/FLINK-12256)) needs to support writing since writing thru Confluent Registry itself and its fancy UI is much more convenient and user friendly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r280600786 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ## @@ -215,6 +218,74 @@ abstract class TableEnvironment(val config: TableConfig) { rels.map(_.asInstanceOf[ExecNode[_, _]]) } + /** +* Register an [[ReadableCatalog]] under a unique name. +* +* @param name the name under which the catalog will be registered +* @param catalog the catalog to register +* @throws CatalogAlreadyExistsException thrown if the catalog already exists +*/ + @throws[CatalogAlreadyExistsException] + def registerCatalog(name: String, catalog: ReadableCatalog): Unit = { +catalogManager.registerCatalog(name, catalog) + } + + /** +* Get a registered [[ReadableCatalog]]. +* +* @param catalogName name of the catalog to get +* @return the requested catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +*/ + @throws[CatalogNotExistException] + def getCatalog(catalogName: String): ReadableCatalog = { +catalogManager.getCatalog(catalogName) + } + + /** +* Get the current catalog. +* +* @return the current catalog in CatalogManager +*/ + def getCurrentCatalog(): ReadableCatalog = { +catalogManager.getCurrentCatalog + } + + /** +* Get the current database name. +* +* @return the current database of the current catalog +*/ + def getCurrentDatabaseName(): String = { +catalogManager.getCurrentCatalog.getCurrentDatabase + } + + /** +* Set the current catalog. +* +* @param name name of the catalog to set as current catalog +* @throws CatalogNotExistException thrown if the catalog doesn't exist +*/ + @throws[CatalogNotExistException] + def setCurrentCatalog(name: String): Unit = { +catalogManager.setCurrentCatalog(name) + } + + /** +* Set the current catalog and current database. +* +* @param catalogName name of the catalog to set as current catalog +* @param databaseName name of the database to set as current database +* @throws CatalogNotExistException thrown if the catalog doesn't exist +* @throws DatabaseNotExistException thrown if the database doesn't exist +*/ + @throws[CatalogNotExistException] + @throws[DatabaseNotExistException] + def setCurrentDatabase(catalogName: String, databaseName: String): Unit = { +catalogManager.setCurrentCatalog(catalogName) +catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName) Review comment: "default database" is only a default value of "current database", and yes, "current database" is a session property. I'm not strong on where to store the "current database". I prefer this way because 1) since every catalog can have such a property, we would need a map in CatalogManager to maintain the mapping then 2) it matches well with [catalog configs in SQL Client yaml file ](https://docs.google.com/document/d/1ALxfiGZBaZ8KUNJtoT443hReoPoJEdt9Db2wiJ2LuWA/edit?usp=sharing) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932 ] Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:28 PM: --- Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{override def open(): Unit = {}} {{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}} {{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}} {{}}} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. was (Author: cre...@gmail.com): Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{override def open(): Unit = {}} {{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}} {{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}} {{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}} {{}}} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. > change to MockStreamTask breaks OneInputStreamOperatorTestHarness > - > > Key: FLINK-12334 > URL: https://issues.apache.org/jira/browse/FLINK-12334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: Cliff Resnick >Priority: Major > > The move to the MockStreamTask is created with does not include > initialization of an Accumulator Map when using the builder > [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] > This results in with a TestHarness whose context contains an immutable empty > map and is breaking tests. The fix is simple, please include an actual map in > the builder call. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932 ] Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:27 PM: --- Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{override def open(): Unit = {}} {{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}} {{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}} {{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}} {{}}} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. was (Author: cre...@gmail.com): Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{ override def open(): Unit = { getRuntimeContext.addAccumulator("combiner-in", recordsIn) getRuntimeContext.addAccumulator("combiner-out", recordsOut) getProcessingTimeService.scheduleAtFixedRate(this, pause, pause) } }} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. > change to MockStreamTask breaks OneInputStreamOperatorTestHarness > - > > Key: FLINK-12334 > URL: https://issues.apache.org/jira/browse/FLINK-12334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: Cliff Resnick >Priority: Major > > The move to the MockStreamTask is created with does not include > initialization of an Accumulator Map when using the builder > [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] > This results in with a TestHarness whose context contains an immutable empty > map and is breaking tests. The fix is simple, please include an actual map in > the builder call. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932 ] Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:27 PM: --- Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{override def open(): Unit = {}} {{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}} {{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}} {{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}} {{}}} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. was (Author: cre...@gmail.com): Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{override def open(): Unit = {}} {{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}} {{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}} {{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}} {{}}} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. > change to MockStreamTask breaks OneInputStreamOperatorTestHarness > - > > Key: FLINK-12334 > URL: https://issues.apache.org/jira/browse/FLINK-12334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: Cliff Resnick >Priority: Major > > The move to the MockStreamTask is created with does not include > initialization of an Accumulator Map when using the builder > [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] > This results in with a TestHarness whose context contains an immutable empty > map and is breaking tests. The fix is simple, please include an actual map in > the builder call. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932 ] Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:26 PM: --- Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: {{ override def open(): Unit = { getRuntimeContext.addAccumulator("combiner-in", recordsIn) getRuntimeContext.addAccumulator("combiner-out", recordsOut) getProcessingTimeService.scheduleAtFixedRate(this, pause, pause) } }} The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. was (Author: cre...@gmail.com): Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: override def open(): Unit = { getRuntimeContext.addAccumulator("combiner-in", recordsIn) getRuntimeContext.addAccumulator("combiner-out", recordsOut) getProcessingTimeService.scheduleAtFixedRate(this, pause, pause) } The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. > change to MockStreamTask breaks OneInputStreamOperatorTestHarness > - > > Key: FLINK-12334 > URL: https://issues.apache.org/jira/browse/FLINK-12334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: Cliff Resnick >Priority: Major > > The move to the MockStreamTask is created with does not include > initialization of an Accumulator Map when using the builder > [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] > This results in with a TestHarness whose context contains an immutable empty > map and is breaking tests. The fix is simple, please include an actual map in > the builder call. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness
[ https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932 ] Cliff Resnick commented on FLINK-12334: --- Hi [~fan_li_ya] We have a custom StreamOperator with an open() method like: override def open(): Unit = { getRuntimeContext.addAccumulator("combiner-in", recordsIn) getRuntimeContext.addAccumulator("combiner-out", recordsOut) getProcessingTimeService.scheduleAtFixedRate(this, pause, pause) } The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the MockStreamTask returned by getRuntimeContext is trying to add the accumulator to an immutable empty map. > change to MockStreamTask breaks OneInputStreamOperatorTestHarness > - > > Key: FLINK-12334 > URL: https://issues.apache.org/jira/browse/FLINK-12334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.8.0 >Reporter: Cliff Resnick >Priority: Major > > The move to the MockStreamTask is created with does not include > initialization of an Accumulator Map when using the builder > [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198] > This results in with a TestHarness whose context contains an immutable empty > map and is breaking tests. The fix is simple, please include an actual map in > the builder call. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix bug in statsd exporter when using negative values
Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix bug in statsd exporter when using negative values URL: https://github.com/apache/flink/pull/8259#discussion_r280538063 ## File path: flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java ## @@ -216,4 +238,14 @@ public String filterCharacters(String input) { return chars == null ? input : new String(chars, 0, pos); } + + private boolean numberIsNegative(Number input) { + try { + return new BigDecimal(input.toString()).compareTo(BigDecimal.ZERO) == -1; Review comment: Yeah good call, in this case the rounding that doubleValue might do wouldn't matter anyhow. Changed it! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r280532280 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FlinkCatalogManager.java ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.util.StringUtils; + +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.schema.SchemaPlus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager implementation for Flink. + * TODO: [FLINK-11275] Decouple CatalogManager with Calcite + * Idealy FlinkCatalogManager should be in flink-table-api-java module. + * But due to that it currently depends on Calcite, a dependency that flink-table-api-java doesn't have right now. + * We temporarily put FlinkCatalogManager in flink-table-planner-blink. + */ +public class FlinkCatalogManager implements CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalogManager.class); + + public static final String BUILTIN_CATALOG_NAME = "builtin"; + + // The catalog to hold all registered and translated tables + // We disable caching here to prevent side effects + private CalciteSchema internalSchema = CalciteSchema.createRootSchema(false, false); + private SchemaPlus rootSchema = internalSchema.plus(); + + // A map between names and catalogs. + private Map catalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + public FlinkCatalogManager() { + LOG.info("Initializing FlinkCatalogManager"); + catalogs = new HashMap<>(); + + GenericInMemoryCatalog inMemoryCatalog = new GenericInMemoryCatalog(BUILTIN_CATALOG_NAME); Review comment: I agree that there might be implementations that for time being do not support writing and might throw exception. In general I think though all the implementation do support writing in a way. I agree e.g. the owner of a platform might decide to disallow writing, but this should be rather done with some sort of access roles or property base etc. In the end all catalogs do support writing, otherwise there would be no way to enter data into them. The method you mentioned `registerTable` is part of the `ReadableWritableCatalog` not `ReadableCatalog`, if the `CatalogManager` works only with `ReadableCatalog`, we would have to perform instance checks and hard casts all the time. I see no reason for that. I believe we can just have a single `Catalog` interface that might throw exception if it should not be modified or just expose a flag such as `isMutable`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema URL: https://github.com/apache/flink/pull/8214#discussion_r280525021 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FlinkCatalogManager.java ## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.table.api.CatalogAlreadyExistsException; +import org.apache.flink.table.api.CatalogNotExistException; +import org.apache.flink.util.StringUtils; + +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.schema.SchemaPlus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A CatalogManager implementation for Flink. + * TODO: [FLINK-11275] Decouple CatalogManager with Calcite + * Idealy FlinkCatalogManager should be in flink-table-api-java module. + * But due to that it currently depends on Calcite, a dependency that flink-table-api-java doesn't have right now. + * We temporarily put FlinkCatalogManager in flink-table-planner-blink. + */ +public class FlinkCatalogManager implements CatalogManager { + private static final Logger LOG = LoggerFactory.getLogger(FlinkCatalogManager.class); + + public static final String BUILTIN_CATALOG_NAME = "builtin"; + + // The catalog to hold all registered and translated tables + // We disable caching here to prevent side effects + private CalciteSchema internalSchema = CalciteSchema.createRootSchema(false, false); + private SchemaPlus rootSchema = internalSchema.plus(); + + // A map between names and catalogs. + private Map catalogs; + + // The name of the default catalog and schema + private String currentCatalogName; + + public FlinkCatalogManager() { + LOG.info("Initializing FlinkCatalogManager"); + catalogs = new HashMap<>(); + + GenericInMemoryCatalog inMemoryCatalog = new GenericInMemoryCatalog(BUILTIN_CATALOG_NAME); Review comment: Good question, this PR only partially reflects the integration of `CatalogManager` and `TableEnv`. We propose adding new APIs to `TableEnv` to read/write catalog metadata. If users try to write to a `ReadableCatalog`, a catalog exception will be thrown to remind them that. E.g. for CatalogBaseTable, we'll have ``` // can have overloaded methods or a different signature to register table in a catalog different than the current one void registerTable(String name, CatalogBaseTable table, boolean ignoreIfExists) { if (!(catalogManager.getCurrentCatalog() instanceof ReadableWritableCatalog)) { throw CatalogException("xxx is a ReadableCatalog only") } catalog = (ReadableWritableCatalog) catalogManager.getCurrentCatalog() catalog.createTable(new ObjectPath(catalog.getCurrentDb(), name), table, ignoreIfExists) } ``` `ReadableCatalog` and `ReadableWritableCatalog` is inherited from the original `ExternalCatalog` and `CrudExternalCatalog`, more context in [FLIP-30](https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs). I believe it's good to have `ReadableCatalog`. It doesn't make sense or just cannot support writing in some catalog impl. E.g. at least for now I don't see Confluent Registry catalog ( [FLINK-12256](https://issues.apache.org/jira/browse/FLINK-12256)) needs to support writing since writing thru Confluent Registry itself and its fancy UI is much more convenient and user friendly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280505199 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -229,31 +231,97 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno @Override public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { - throw new UnsupportedOperationException(); + try { + client.dropTable( + tablePath.getDatabaseName(), + tablePath.getObjectName(), + // Indicate whether associated data should be deleted. + // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary Review comment: I think what we can do later is to expose this in the API, something like dropTable(ObjectPath, boolean, boolean deleteData). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280508961 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java ## @@ -41,28 +41,28 @@ private final List partitionKeys; // Properties of the table private final Map properties; - // Comment of the table - private String comment = "This is a generic catalog table."; + // Description of the table + private String description = "This is a generic catalog table."; Review comment: Ok. Please revert and create a followup JIRA to track this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280508513 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java ## @@ -18,32 +18,179 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; - +import java.util.stream.Collectors; /** * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. */ public class GenericHiveMetastoreCatalogUtil { + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + + // Flink tables should be stored as 'external' tables in Hive metastore + private static final Map EXTERNAL_TABLE_PROPERTY = new HashMap() {{ + put("EXTERNAL", "TRUE"); + }}; + private GenericHiveMetastoreCatalogUtil() { } // -- Utils -- /** -* Creates a Hive database from CatalogDatabase. +* Creates a Hive database from a CatalogDatabase. +* +* @param databaseName name of the database +* @param catalogDatabase the CatalogDatabase instance +* @return a Hive database */ - public static Database createHiveDatabase(String dbName, CatalogDatabase db) { - Map props = db.getProperties(); + public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) { return new Database( - dbName, - db.getDescription().isPresent() ? db.getDescription().get() : null, + databaseName, + catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null, null, - props); + catalogDatabase.getProperties()); + } + + /** +* Creates a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table the CatalogBaseTable instance +* @return a Hive table +*/ + public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + Map properties = new HashMap<>(table.getProperties()); + + // Table description + if (table.getDescription().isPresent()) { + properties.put(HiveTableConfig.TABLE_DESCRITPION, table.getDescription().get()); + } + + Table hiveTable = new Table(); + hiveTable.setDbName(tablePath.getDatabaseName()); + hiveTable.setTableName(tablePath.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); + + // Table properties + hiveTable.setParameters(buildFlinkProperties(properties)); + hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY); + + // Hive table's StorageDescriptor + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>())); + + List allColumns = createHiveColumns(table.getSchema()); + + // Table columns and partition keys + CatalogTable catalogTable = (CatalogTable) table; + + if (catalogTable.isPartitioned()) { + int partitionKeySize = catalogTable.getPartitionKeys().size(); + List regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize); + List partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size()); + + sd.setCols(regularColumns); +
[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#discussion_r280475679 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ## @@ -162,10 +164,13 @@ void assignExclusiveSegments(List segments) { public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException { if (partitionRequestClient == null) { // Create a client and request the partition - partitionRequestClient = connectionManager - .createPartitionRequestClient(connectionId); + try { + partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId); + } catch (RemoteTransportException ex) { Review comment: Other `IOExceptions` are not relevant here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#discussion_r280471611 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ## @@ -183,11 +184,17 @@ public void run() { subpartitionView = checkAndWaitForSubpartitionView(); } - BufferAndBacklog next = subpartitionView.getNextBuffer(); + BufferAndBacklog next; + try { + next = subpartitionView.getNextBuffer(); + } catch (Throwable t) { Review comment: I think we should not catch `Throwable` here. Instead we should only catch the `IOException`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#discussion_r280481421 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ## @@ -389,25 +401,80 @@ public void testGetNextAfterPartitionReleased() throws Exception { try { channel.getNextBuffer(); fail("Did not throw expected CancelTaskException"); - } catch (CancelTaskException ignored) { + } catch (DataConsumptionException ignored) { } channel.releaseAllResources(); assertFalse(channel.getNextBuffer().isPresent()); } + /** +* Tests the {@link PartitionNotFoundException} is thrown during requesting partition and +* wrapped into {@link DataConsumptionException}. +*/ + @Test + public void testDataConsumptionExceptionDuringPartitionRequest() throws Exception { + Tuple2 backoff = new Tuple2<>(0, 0); + + ResultPartitionManager partitionManager = new ResultPartitionManager(); + + ResultPartitionID partitionId = new ResultPartitionID(); + + LocalInputChannel ch = createLocalInputChannel( + InputChannelTestUtils.createSingleInputGate(1), partitionId, partitionManager, backoff); + + try { + ch.requestSubpartition(0); + fail("Did not throw expected DataConsumptionException"); + } catch (IOException ex) { + assertTrue(ExceptionUtils.findThrowable(ex, PartitionNotFoundException.class).isPresent()); + verifyDataConsumptionException(partitionId, ex); + } + } + + /** +* Tests the {@link DataConsumptionException} is wrapped when {@link ResultSubpartitionView#getNextBuffer()} +* throws an exception. +*/ + @Test + public void testDataConsumptionExceptionDuringGetNextBuffer() throws Exception { + Tuple2 backoff = new Tuple2<>(0, 0); + + ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.getNextBuffer()).thenThrow(new IOException("Expected test exception")); + + ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); + when(partitionManager + .createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class))) + .thenReturn(view); Review comment: Same here. Is it possible to do it without mocking? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#discussion_r280480963 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ## @@ -260,13 +265,20 @@ public void testProducerFailedException() throws Exception { BufferProvider bufferProvider = mock(BufferProvider.class); when(inputGate.getBufferProvider()).thenReturn(bufferProvider); + ResultPartitionID partitionId = new ResultPartitionID(); LocalInputChannel ch = createLocalInputChannel( - inputGate, partitionManager, new Tuple2<>(0, 0)); + inputGate, partitionId, partitionManager, new Tuple2<>(0, 0)); ch.requestSubpartition(0); - // Should throw an instance of CancelTaskException. - ch.getNextBuffer(); + // Should throw an instance of DataConsumptionException. + try { + ch.getNextBuffer(); + fail("Did not throw expected DataConsumptionException"); + } catch (IOException ex) { Review comment: Can't we simply catch a `DataConsumptionException` here instead of calling `verifyDataConsumptionException`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#discussion_r280479507 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java ## @@ -383,9 +397,16 @@ private EmbeddedChannel createEmbeddedChannel() { return new EmbeddedChannel(protocol.getClientChannelHandlers()); } - private RemoteInputChannel addInputChannel(NetworkClientHandler clientHandler) - throws IOException { - RemoteInputChannel rich = createRemoteInputChannel(); + private RemoteInputChannel addInputChannel( + NetworkClientHandler clientHandler, + ResultPartitionID partitionId) throws Exception { + RemoteInputChannel rich = new RemoteInputChannel( + InputChannelTestUtils.createSingleInputGate(1), + 0, + partitionId, + mock(ConnectionID.class), Review comment: Can't we create a `ConnectionID` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#discussion_r280482206 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ## @@ -333,6 +331,70 @@ public void testProducerFailedException() throws Exception { ch.getNextBuffer(); } + /** +* Tests that {@link DataConsumptionException} is wrapped if {@link RemoteTransportException} +* is thrown during creating {@link PartitionRequestClient}. +*/ + @Test + public void testDataConsumptionExceptionDuringCreatingClient() throws Exception { + final ConnectionManager connManager = mock(ConnectionManager.class); + when(connManager.createPartitionRequestClient(any(ConnectionID.class))) + .thenThrow(RemoteTransportException.class); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final RemoteInputChannel ch = createRemoteInputChannel(partitionId, connManager); + + try { + ch.requestSubpartition(0); + fail("Did not throw expected DataConsumptionException"); + } catch (IOException ex) { + verifyDataConsumptionException(partitionId, ex); + } + } + + /** +* Tests that {@link DataConsumptionException} is wrapped if an exception is thrown +* during requesting partition. +*/ + @Test + public void testDataConsumptionExceptionDuringPartitionRequest() throws Exception { + final ConnectionManager connManager = mock(ConnectionManager.class); + final PartitionRequestClient client = mock(PartitionRequestClient.class); + when(connManager.createPartitionRequestClient(any(ConnectionID.class))) + .thenReturn(client); + + final ResultPartitionID partitionId = new ResultPartitionID(); + final RemoteInputChannel ch = createRemoteInputChannel(partitionId, connManager); + when(client.requestSubpartition(partitionId, 0, ch, 0)).thenThrow(IOException.class); Review comment: Can we do these tests without the extensive mocking? I think this is better for maintainability. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#discussion_r280481073 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ## @@ -389,25 +401,80 @@ public void testGetNextAfterPartitionReleased() throws Exception { try { channel.getNextBuffer(); fail("Did not throw expected CancelTaskException"); - } catch (CancelTaskException ignored) { + } catch (DataConsumptionException ignored) { } channel.releaseAllResources(); assertFalse(channel.getNextBuffer().isPresent()); } + /** +* Tests the {@link PartitionNotFoundException} is thrown during requesting partition and +* wrapped into {@link DataConsumptionException}. +*/ + @Test + public void testDataConsumptionExceptionDuringPartitionRequest() throws Exception { + Tuple2 backoff = new Tuple2<>(0, 0); + + ResultPartitionManager partitionManager = new ResultPartitionManager(); + + ResultPartitionID partitionId = new ResultPartitionID(); + + LocalInputChannel ch = createLocalInputChannel( + InputChannelTestUtils.createSingleInputGate(1), partitionId, partitionManager, backoff); + + try { + ch.requestSubpartition(0); + fail("Did not throw expected DataConsumptionException"); + } catch (IOException ex) { Review comment: Same here, why not simply catching a `DataConsumptionException`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#discussion_r280480320 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ## @@ -389,25 +401,80 @@ public void testGetNextAfterPartitionReleased() throws Exception { try { channel.getNextBuffer(); fail("Did not throw expected CancelTaskException"); Review comment: update message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#discussion_r280481273 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ## @@ -389,25 +401,80 @@ public void testGetNextAfterPartitionReleased() throws Exception { try { channel.getNextBuffer(); fail("Did not throw expected CancelTaskException"); - } catch (CancelTaskException ignored) { + } catch (DataConsumptionException ignored) { } channel.releaseAllResources(); assertFalse(channel.getNextBuffer().isPresent()); } + /** +* Tests the {@link PartitionNotFoundException} is thrown during requesting partition and +* wrapped into {@link DataConsumptionException}. +*/ + @Test + public void testDataConsumptionExceptionDuringPartitionRequest() throws Exception { + Tuple2 backoff = new Tuple2<>(0, 0); + + ResultPartitionManager partitionManager = new ResultPartitionManager(); + + ResultPartitionID partitionId = new ResultPartitionID(); + + LocalInputChannel ch = createLocalInputChannel( + InputChannelTestUtils.createSingleInputGate(1), partitionId, partitionManager, backoff); + + try { + ch.requestSubpartition(0); + fail("Did not throw expected DataConsumptionException"); + } catch (IOException ex) { + assertTrue(ExceptionUtils.findThrowable(ex, PartitionNotFoundException.class).isPresent()); + verifyDataConsumptionException(partitionId, ex); + } + } + + /** +* Tests the {@link DataConsumptionException} is wrapped when {@link ResultSubpartitionView#getNextBuffer()} +* throws an exception. +*/ + @Test + public void testDataConsumptionExceptionDuringGetNextBuffer() throws Exception { + Tuple2 backoff = new Tuple2<>(0, 0); + + ResultSubpartitionView view = mock(ResultSubpartitionView.class); + when(view.getNextBuffer()).thenThrow(new IOException("Expected test exception")); Review comment: Can we do this without mocking? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#discussion_r280473083 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java ## @@ -127,7 +128,7 @@ void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedE if (increaseBackoff()) { Review comment: What about other `IOExceptions` which might be thrown out of the `ResultPartitionManager#createSubpartitionView`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12389) flink codegen set String type for ByteBuffer fields
[ https://issues.apache.org/jira/browse/FLINK-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831745#comment-16831745 ] Yu Yang edited comment on FLINK-12389 at 5/2/19 4:42 PM: - [~twalthr], [~fhueske] thanks for the reply! given that this issue is with legacy planner, shall we try with flink-table-planner-blink? any pointers on the current status and the community's plan on flink-table-planner-blink and flink-table-runtime-blink? [~ykt836], [~jinyu.zj] could you share any insights on this? ^^ was (Author: yuyang08): [~twalthr], [~fhueske] thanks for the reply! given that this issue is with legacy planner, shall we try with flink-table-planner-blink? any pointers on the current status and the community's plan on flink-table-planner-blink and flink-table-runtime-blink? > flink codegen set String type for ByteBuffer fields > --- > > Key: FLINK-12389 > URL: https://issues.apache.org/jira/browse/FLINK-12389 > Project: Flink > Issue Type: Bug > Components: Table SQL / Legacy Planner >Affects Versions: 1.8.0 >Reporter: Yu Yang >Priority: Major > > We try to write a simple flink sql program using "select .. from " > statement, and encounter a compile exception. > *Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column > 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"* > Further debugging shows that the following flink-generated code snippet > caused problem: > {code} > final java.lang.reflect.Field > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds = > org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField( > com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds"); > ... > boolean isNull$5 = (java.nio.ByteBuffer) > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == > null; > java.lang.String result$4; > if (isNull$5) { > result$4 = ""; > } > else { > result$4 = (java.lang.String) (java.nio.ByteBuffer) > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1); > } > > {code} > The following is the stack track: > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 17 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column > 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049) > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) > at org.codehaus.janino.Java$Cast.accept(Java.java:4887) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790) > at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732) > at org.codehaus.janino.Java$Assignment.accept(Java.java:4466) > at
[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8329#discussion_r280503007 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java ## @@ -18,32 +18,179 @@ package org.apache.flink.table.catalog.hive; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.GenericCatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.plan.stats.TableStats; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; - +import java.util.stream.Collectors; /** * Utils to convert meta objects between Flink and Hive for GenericHiveMetastoreCatalog. */ public class GenericHiveMetastoreCatalogUtil { + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + + // Flink tables should be stored as 'external' tables in Hive metastore + private static final Map EXTERNAL_TABLE_PROPERTY = new HashMap() {{ + put("EXTERNAL", "TRUE"); + }}; + private GenericHiveMetastoreCatalogUtil() { } // -- Utils -- /** -* Creates a Hive database from CatalogDatabase. +* Creates a Hive database from a CatalogDatabase. +* +* @param databaseName name of the database +* @param catalogDatabase the CatalogDatabase instance +* @return a Hive database */ - public static Database createHiveDatabase(String dbName, CatalogDatabase db) { - Map props = db.getProperties(); + public static Database createHiveDatabase(String databaseName, CatalogDatabase catalogDatabase) { return new Database( - dbName, - db.getDescription().isPresent() ? db.getDescription().get() : null, + databaseName, + catalogDatabase.getDescription().isPresent() ? catalogDatabase.getDescription().get() : null, null, - props); + catalogDatabase.getProperties()); + } + + /** +* Creates a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table the CatalogBaseTable instance +* @return a Hive table +*/ + public static Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { + Map properties = new HashMap<>(table.getProperties()); + + // Table description + if (table.getDescription().isPresent()) { + properties.put(HiveTableConfig.TABLE_DESCRITPION, table.getDescription().get()); + } + + Table hiveTable = new Table(); + hiveTable.setDbName(tablePath.getDatabaseName()); + hiveTable.setTableName(tablePath.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); + + // Table properties + hiveTable.setParameters(buildFlinkProperties(properties)); + hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY); + + // Hive table's StorageDescriptor + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>())); + + List allColumns = createHiveColumns(table.getSchema()); + + // Table columns and partition keys + CatalogTable catalogTable = (CatalogTable) table; + + if (catalogTable.isPartitioned()) { + int partitionKeySize = catalogTable.getPartitionKeys().size(); + List regularColumns = allColumns.subList(0, allColumns.size() - partitionKeySize); + List partitionColumns = allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size()); + + sd.setCols(regularColumns); +
[GitHub] [flink] flinkbot commented on issue #8336: [FLINK-12305][docs][table] Improve Table API/SQL time attribute documentation.
flinkbot commented on issue #8336: [FLINK-12305][docs][table] Improve Table API/SQL time attribute documentation. URL: https://github.com/apache/flink/pull/8336#issuecomment-488744275 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12305) Table API Clarification
[ https://issues.apache.org/jira/browse/FLINK-12305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12305: --- Labels: pull-request-available (was: ) > Table API Clarification > --- > > Key: FLINK-12305 > URL: https://issues.apache.org/jira/browse/FLINK-12305 > Project: Flink > Issue Type: Improvement > Components: Project Website >Affects Versions: 1.8.0 >Reporter: Alex Barnes >Assignee: Fabian Hueske >Priority: Minor > Labels: pull-request-available > > It is not clear from the documentation if late arriving data is correctly > handled in the Flink Table/SQL APIs. The documentation makes passing > reference to recognizing late arriving data, but does not go into depth as to > what kind of triggering/processing can be performed on it > [https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time] > This old email thread on the apache-flink-users mailing list tells a > different story - specifically that late arriving data is not supported and > DataStream APIs need to be used instead: > [http://osdir.com/apache-flink-users/msg08110.html] > Has support been added since that email correspondence? Please consider > reducing ambiguity in the documentation and update it to better reflect the > current/planned state of support for late arriving data in the Table API. > Thanks, > Alex -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] fhueske opened a new pull request #8336: [FLINK-12305][docs][table] Improve Table API/SQL time attribute documentation.
fhueske opened a new pull request #8336: [FLINK-12305][docs][table] Improve Table API/SQL time attribute documentation. URL: https://github.com/apache/flink/pull/8336 ## Contribution Checklist ## What is the purpose of the change * Improve introduction of processing time and event time support of Table API / SQL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12389) flink codegen set String type for ByteBuffer fields
[ https://issues.apache.org/jira/browse/FLINK-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831745#comment-16831745 ] Yu Yang commented on FLINK-12389: - [~twalthr], [~fhueske] thanks for the reply! given that this issue is with legacy planner, shall we try with flink-table-planner-blink? any pointers on the current status and the community's plan on flink-table-planner-blink and flink-table-runtime-blink? > flink codegen set String type for ByteBuffer fields > --- > > Key: FLINK-12389 > URL: https://issues.apache.org/jira/browse/FLINK-12389 > Project: Flink > Issue Type: Bug > Components: Table SQL / Legacy Planner >Affects Versions: 1.8.0 >Reporter: Yu Yang >Priority: Major > > We try to write a simple flink sql program using "select .. from " > statement, and encounter a compile exception. > *Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column > 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"* > Further debugging shows that the following flink-generated code snippet > caused problem: > {code} > final java.lang.reflect.Field > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds = > org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField( > com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds"); > ... > boolean isNull$5 = (java.nio.ByteBuffer) > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == > null; > java.lang.String result$4; > if (isNull$5) { > result$4 = ""; > } > else { > result$4 = (java.lang.String) (java.nio.ByteBuffer) > field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1); > } > > {code} > The following is the stack track: > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 17 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) > at > org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column > 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049) > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) > at org.codehaus.janino.Java$Cast.accept(Java.java:4887) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790) > at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752) > at > org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732) > at org.codehaus.janino.Java$Assignment.accept(Java.java:4466) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) >
[jira] [Assigned] (FLINK-12305) Table API Clarification
[ https://issues.apache.org/jira/browse/FLINK-12305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-12305: - Assignee: Fabian Hueske > Table API Clarification > --- > > Key: FLINK-12305 > URL: https://issues.apache.org/jira/browse/FLINK-12305 > Project: Flink > Issue Type: Improvement > Components: Project Website >Affects Versions: 1.8.0 >Reporter: Alex Barnes >Assignee: Fabian Hueske >Priority: Minor > > It is not clear from the documentation if late arriving data is correctly > handled in the Flink Table/SQL APIs. The documentation makes passing > reference to recognizing late arriving data, but does not go into depth as to > what kind of triggering/processing can be performed on it > [https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time] > This old email thread on the apache-flink-users mailing list tells a > different story - specifically that late arriving data is not supported and > DataStream APIs need to be used instead: > [http://osdir.com/apache-flink-users/msg08110.html] > Has support been added since that email correspondence? Please consider > reducing ambiguity in the documentation and update it to better reflect the > current/planned state of support for late arriving data in the Table API. > Thanks, > Alex -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280485979 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/CharType.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.ValidationException; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * Logical type of a fixed-length character string. + * + * The serialized string representation is {@code CHAR(n)} where {@code n} is the number of + * code points. {@code n} must have a value between 1 and 255 (both inclusive). If no length is + * specified, {@code n} is equal to 1. + */ +@PublicEvolving +public final class CharType extends LogicalType { + + private static final int MIN_LENGTH = 1; + + private static final int MAX_LENGTH = 255; + + private static final int DEFAULT_LENGTH = 1; + + private static final String DEFAULT_FORMAT = "CHAR(%d)"; Review comment: nit: `DEFAULT_FORMAT` -> `FORMAT`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280484675 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * An enumeration of logical type roots containing static information about logical data types. + * + * A root is an essential description of a {@link LogicalType} without additional parameters. For + * example, a parameterized logical type {@code DECIMAL(12,3)} possesses all characteristics of its + * root {@code DECIMAL}. Additionally, a logical type root enables efficient comparision during the + * evaluation of types. + * + * The enumeration is very close to the SQL standard in terms of naming and completeness. However, + * it reflects just a subset of the evolving standard and contains some extensions (such as {@code NULL} + * or {@code ANY}). + * + * See the type-implementing classes for a more detailed description of each type. + */ +@PublicEvolving +public enum LogicalTypeRoot { + + CHAR( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.CHARACTER_STRING), + + VARCHAR( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.CHARACTER_STRING), + + BOOLEAN( + LogicalTypeFamily.PREDEFINED), + + BINARY( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.BINARY_STRING), + + VARBINARY( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.BINARY_STRING), + + DECIMAL( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.NUMERIC, + LogicalTypeFamily.EXACT_NUMERIC), + + TINYINT( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.NUMERIC, + LogicalTypeFamily.EXACT_NUMERIC), + + SMALLINT( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.NUMERIC, + LogicalTypeFamily.EXACT_NUMERIC), + + INTEGER( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.NUMERIC, + LogicalTypeFamily.EXACT_NUMERIC), + + BIGINT( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.NUMERIC, + LogicalTypeFamily.EXACT_NUMERIC), + + FLOAT( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.NUMERIC, + LogicalTypeFamily.APPROXIMATE_NUMERIC), + + DOUBLE( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.NUMERIC, + LogicalTypeFamily.APPROXIMATE_NUMERIC), + + DATE( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.DATETIME), + + TIME_WITHOUT_TIME_ZONE( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.DATETIME), + + TIMESTAMP_WITHOUT_TIME_ZONE( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.DATETIME, + LogicalTypeFamily.TIMESTAMP), + + TIMESTAMP_WITH_TIME_ZONE( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.DATETIME, + LogicalTypeFamily.TIMESTAMP), + + TIMESTAMP_WITH_LOCAL_TIME_ZONE( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.DATETIME, + LogicalTypeFamily.TIMESTAMP, + LogicalTypeFamily.EXTENSION), + + INTERVAL_YEAR_MONTH( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.INTERVAL), + + INTERVAL_DAY_TIME( + LogicalTypeFamily.PREDEFINED, + LogicalTypeFamily.INTERVAL), + + ARRAY( + LogicalTypeFamily.CONSTRUCTED, +
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280487361 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; + +/** + * A logical type that describes the data type of a value. It does not imply a concrete physical + * representation for transmission or storage but defines the boundaries between JVM-based languages + * and the table ecosystem. + * + * The definition of a logical type is similar to the SQL standard's "data type" terminology but + * also contains information about the nullability of a value for efficient handling of scalar + * expressions. + * + * Subclasses of this class define characteristics of built-in or user-defined types. + * + * Instances of this class describe the fully parameterized, immutable type with additional + * information such as numeric precision or expected length. + * + * NOTE: A logical type is just a description of a type, a planner or runtime might not support + * every type in every logical precision yet! + */ +@PublicEvolving +public abstract class LogicalType implements Serializable { + + private final boolean isNullable; + + private final LogicalTypeRoot typeRoot; + + public LogicalType(boolean isNullable, LogicalTypeRoot typeRoot) { + this.isNullable = isNullable; + this.typeRoot = Preconditions.checkNotNull(typeRoot); + } + + /** +* Returns whether a value of this type can be {@code null}. +*/ + public boolean isNullable() { + return isNullable; + } + + /** +* Returns the root of this type. It is an essential description without additional parameters. +*/ + public LogicalTypeRoot getTypeRoot() { + return typeRoot; + } + + /** +* Returns a deep copy of this type with possibly different nullability. +* +* @param isNullable the intended nullability of the copied type +* @return a deep copy +*/ + public abstract LogicalType copy(boolean isNullable); + + /** +* Returns a deep copy of this type. +* +* @return a deep copy +*/ + public LogicalType copy() { Review comment: Make this method `final` with a comment to implement `LogicalType copy(boolean isNullable)` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#discussion_r280480232 ## File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.Assert; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test for subclasses of {@link org.apache.flink.table.types.logical.LogicalType}. + */ +public class LogicalTypesTest { + + // + + private static void testAll( + LogicalType nullableType, + String typeString, + Class[] input, Review comment: `input` -> `supportedInputClasses` `output` -> `supportedOutputClasses` or a Javadoc? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280463187 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java ## @@ -187,8 +186,6 @@ private void decorateNode(Integer vertexID, ObjectNode node) { node.put(PACT, "Operator"); } - StreamOperator operator = streamGraph.getStreamNode(vertexID).getOperator(); Review comment: Is this change relevant to the rest of the PR? If not, could you extract changes in this file to a separate `[hotfix]` commit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280472961 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/OneInputOperatorWrapper.java ## @@ -52,7 +53,9 @@ public OneInputOperatorWrapper(GeneratedClass> g public void setup(StreamTask containingTask, StreamConfig config, Output> output) { operator = generatedClass.newInstance(containingTask.getUserCodeClassLoader()); - operator.setup(containingTask, config, output); + if (operator instanceof SetupableStreamOperator) { + ((SetupableStreamOperator) operator).setup(containingTask, config, output); Review comment: Would it be viable to migrate this already to the non setupable operator? So to replace this if check with: ``` operator = generatedClass.newInstance(containingTask.getUserCodeClassLoader(), containingTask, config, output); ``` ? If it's non trivial it could be done as some kind of follow up step. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280466736 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -480,13 +483,17 @@ private StreamGraph generateInternal(List> transformatio streamGraph.addSource(source.getId(), slotSharingGroup, source.getCoLocationGroupKey(), - source.getOperator(), + source.getOperatorFactory(), null, source.getOutputType(), "Source: " + source.getName()); - if (source.getOperator().getUserFunction() instanceof InputFormatSourceFunction) { - InputFormatSourceFunction fs = (InputFormatSourceFunction) source.getOperator().getUserFunction(); - streamGraph.setInputFormat(source.getId(), fs.getFormat()); + if (source.getOperatorFactory() instanceof SimpleOperatorFactory) { Review comment: Possible issue: this code doesn't support setting input formats for non `SimpleOperatorFactories`. By implementing this check this way, we are supporting all of the present cases, but it makes it kind of strange, that for the new way we don't support it - we do not have a migration path to get rid of `StreamOperators` from the `StreamTransformation`. Could this if check be reworked to something like: ``` if (source.getOperatorFactory() insnaceof InputFormatSourceOperatorFactory) { streamGraph.setInputFormat(id, ((InputFormatSourceOperator) source.getOperatorFactory()).getFormat()); } ``` ? Combination of `SimpleOperatorFactory` and `InputFormatSourceOperatorFactory` could implement `getFormat() { return this.getOperator().getUserFunction().getFormat()`. while future non `SimpleOperatorFactory` could be supported as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280467152 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java ## @@ -252,9 +253,12 @@ private boolean generateNodeHash( if (LOG.isDebugEnabled()) { String udfClassName = ""; - if (node.getOperator() instanceof AbstractUdfStreamOperator) { - udfClassName = ((AbstractUdfStreamOperator) node.getOperator()) - .getUserFunction().getClass().getName(); + if (node.getOperatorFactory() instanceof SimpleOperatorFactory) { Review comment: ditto about factories. `if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280470782 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import java.io.Serializable; + +/** + * A factory to create {@link StreamOperator}. + * + * @param The output type of the operator + */ Review comment: `@PublicEvolving`? `@Internal`? (question to someone from API team) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280474556 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SetupableStreamOperator.java ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * Stream operators can implement this interface if they need access to the context and the output. + * + * @param The output type of the operator + */ +@PublicEvolving +public interface SetupableStreamOperator { Review comment: Mark the class `@Deprecated` and add java doc > This class is deprecated in favour of using `StreamOperatorFactory` and it's `StreamOperatorFactory#createStreamOperator` and passing the required parameters to the `Operator`'s constructor in create method. ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280467451 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ## @@ -648,12 +649,14 @@ private void configureCheckpointing() { final ArrayList hooks = new ArrayList<>(); for (StreamNode node : streamGraph.getStreamNodes()) { - StreamOperator op = node.getOperator(); - if (op instanceof AbstractUdfStreamOperator) { - Function f = ((AbstractUdfStreamOperator) op).getUserFunction(); - - if (f instanceof WithMasterCheckpointHook) { - hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook) f)); + if (node.getOperatorFactory() instanceof SimpleOperatorFactory) { Review comment: ditto `if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] piyushnarang commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS
piyushnarang commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS URL: https://github.com/apache/flink/pull/8117#discussion_r280465958 ## File path: flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java ## @@ -48,6 +55,33 @@ public void configure(Configuration config) { public FileSystem create(URI fsUri) throws IOException { checkNotNull(fsUri, "passed file system URI object should not be null"); LOG.info("Trying to load and instantiate Azure File System"); - return new AzureFileSystem(fsUri, flinkConfig); + return new HadoopFileSystem(createInitializedAzureFS(fsUri, flinkConfig)); + } + + // uri is of the form: wasb(s)://yourcontai...@youraccount.blob.core.windows.net/testDir + private org.apache.hadoop.fs.FileSystem createInitializedAzureFS(URI fsUri, Configuration flinkConfig) throws IOException { + org.apache.hadoop.conf.Configuration hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); Review comment: Updated the review to incorporate this. I've filed: https://jira.apache.org/jira/browse/FLINK-12394 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12394) Update FileSystem factory classes to use HadoopConfigLoader
Piyush Narang created FLINK-12394: - Summary: Update FileSystem factory classes to use HadoopConfigLoader Key: FLINK-12394 URL: https://issues.apache.org/jira/browse/FLINK-12394 Project: Flink Issue Type: Task Reporter: Piyush Narang As part of the review feedback in PR: https://github.com/apache/flink/pull/8117, we noticed that there are a couple of existing FileSystem factories that are manually copying relevant config entries from flink config -> hadoop config as part of the factory configuration. We already have an existing class to do this - HadoopConfigLoader. This jira is to track the work to update these existing file system factories to switch over to the HadoopConfigLoader once the above PR is merged. Factories that need to be fixed: SwiftFileSystemFactory OSSFileSystemFactory HadoopFsFactory -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-488708907 ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
tillrohrmann commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-488708092 The commit message and the PR title were a bit misleading. I've updated them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r280451532 ## File path: flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml ## @@ -0,0 +1,121 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + + 4.0.0 + + + org.apache.flink + flink-end-to-end-tests + 1.8-SNAPSHOT Review comment: Sorry for the long review time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r280450967 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java ## @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeIdsForCheckpoint; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.Acknowledger; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema; +import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.auth.Credentials; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeUnit; + +import static com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder; +import static java.util.Collections.emptyList; + +/** + * PubSub Source, this Source will consume PubSub messages from a subscription and Acknowledge them on the next checkpoint. + * This ensures every message will get acknowledged at least once. + */ +public class PubSubSource extends RichSourceFunction + implements Acknowledger, ResultTypeQueryable, ParallelSourceFunction, CheckpointListener, ListCheckpointed> { + private static final Logger LOG = LoggerFactory.getLogger(PubSubSource.class); + protected final PubSubDeserializationSchema deserializationSchema; + protected final PubSubSubscriberFactory pubSubSubscriberFactory; + protected final Credentials credentials; + protected final String projectSubscriptionName; + protected final int maxMessagesPerPull; + protected final int maxMessagesToAcknowledge; + protected final AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory; + + protected transient AcknowledgeOnCheckpoint acknowledgeOnCheckpoint; + protected transient SubscriberStub subscriber; + protected transient PullRequest pullRequest; + protected transient EventLoopGroup eventLoopGroup; + + protected transient volatile boolean isRunning; + protected transient volatile ApiFuture messagesFuture; + + PubSubSource(PubSubDeserializationSchema deserializationSchema, PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, String projectSubscriptionName, int maxMessagesPerPull, int maxMessagesToAcknowledge, AcknowledgeOnCheckpointFactory acknowledgeOnCheckpointFactory) { +
[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r280449150 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,144 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-gcp-pubsub{{ site.scala_version_suffix }} + {{ site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{ site.baseurl }}/dev/projectsetup/dependencies.html) +for information about how to package the program with the libraries for +cluster execution. + +## Consuming or Producing PubSubMessages + +The connector provides a connectors for receiving and sending messages from and to Google PubSub. +Google PubSub has an `Atleast-Once` guarantee and as such the connector delivers the same guarantees. + +### PubSub SourceFunction + +The class `PubSubSource` has a builder to create PubSubsources: `PubSubSource.newBuilder(...)` + +There are several optional methods to alter how the PubSubSource is created, the bare minimum is to provide a Google project, Pubsub subscription and a way to deserialize the PubSubMessages. + +Example: + + + +{% highlight java %} +StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + +DeserializationSchema deserializer = (...); Review comment: the connector now uses a `PubSubDeserializationSchema` (see comments on it below) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it
tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280432286 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -392,43 +379,10 @@ public JobMaster( final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { - final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); - if (execution == null) { - // can happen when JobManager had already unregistered this execution upon on task failure, - // but TaskManager get some delay to aware of that situation - if (log.isDebugEnabled()) { - log.debug("Can not find Execution for attempt {}.", executionAttempt); - } - // but we should TaskManager be aware of this - return FutureUtils.completedExceptionally(new Exception("Can not find Execution for attempt " + executionAttempt)); - } - - final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); - if (vertex == null) { - log.error("Cannot find execution vertex for vertex ID {}.", vertexID); - return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + vertexID)); - } - - if (vertex.getSplitAssigner() == null) { - log.error("No InputSplitAssigner for vertex ID {}.", vertexID); - return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID)); - } - - final InputSplit nextInputSplit = execution.getNextInputSplit(); - - if (log.isDebugEnabled()) { - log.debug("Send next input split {}.", nextInputSplit); - } - try { - final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); - return CompletableFuture.completedFuture(new SerializedInputSplit(serializedInputSplit)); - } catch (Exception ex) { - log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); - IOException reason = new IOException("Could not serialize the next input split of class " + - nextInputSplit.getClass() + ".", ex); - vertex.fail(reason); - return FutureUtils.completedExceptionally(reason); + return CompletableFuture.completedFuture(schedulerNG.requestNextInputSplit(vertexID, executionAttempt)); + } catch (IOException e) { Review comment: I would also add a logging statement logging the `e` because it might potentially leave the distributed component here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r280448443 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,144 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-gcp-pubsub{{ site.scala_version_suffix }} + {{ site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{ site.baseurl }}/dev/projectsetup/dependencies.html) +for information about how to package the program with the libraries for +cluster execution. Review comment: Add a note that this connector is new and hasn't received widespread testing yet. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r280448936 ## File path: docs/dev/connectors/pubsub.md ## @@ -0,0 +1,144 @@ +--- +title: "Google PubSub" +nav-title: PubSub +nav-parent_id: connectors +nav-pos: 7 +--- + + +This connector provides a Source and Sink that can read from and write to +[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-gcp-pubsub{{ site.scala_version_suffix }} + {{ site.version }} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here]({{ site.baseurl }}/dev/projectsetup/dependencies.html) +for information about how to package the program with the libraries for +cluster execution. + +## Consuming or Producing PubSubMessages + +The connector provides a connectors for receiving and sending messages from and to Google PubSub. +Google PubSub has an `Atleast-Once` guarantee and as such the connector delivers the same guarantees. Review comment: ```suggestion Google PubSub has an `at-least-once` guarantee and as such the connector delivers the same guarantees. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r280449898 ## File path: flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/PubSubDeserializationSchema.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.gcp.pubsub.common; + +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +import com.google.pubsub.v1.PubsubMessage; + +import java.io.Serializable; + +/** + * The deserialization schema describes how to turn the PubsubMessages + * into data types (Java/Scala objects) that are processed by Flink. + * + * @param The type created by the deserialization schema. + */ +public interface PubSubDeserializationSchema extends Serializable, ResultTypeQueryable { Review comment: What's the benefit of using this schema instead of the `DeserializationSchema`, shared by the other connectors? I think it would be a lot better to use the same deserialization schema as the other connectors, so that people can re-use existing implementations. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#discussion_r280454402 ## File path: flink-end-to-end-tests/test-scripts/test_streaming_gcp_pubsub.sh ## @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cd "${END_TO_END_DIR}/flink-connector-gcp-pubsub-emulator-tests" + +mvn test -DskipTests=false Review comment: for me, the tests didn't execute: ``` ➜ flink-connector-gcp-pubsub-emulator-tests git:(pr6594) ✗ mvn test -DskipTests=false [INFO] Scanning for projects... [INFO] [INFO] -< org.apache.flink:flink-connector-gcp-pubsub-emulator-tests >- [INFO] Building flink-connector-gcp-pubsub-emulator-tests 1.9-SNAPSHOT [INFO] [ jar ]- [INFO] [INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ flink-connector-gcp-pubsub-emulator-tests --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ flink-connector-gcp-pubsub-emulator-tests --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven) @ flink-connector-gcp-pubsub-emulator-tests --- [INFO] [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @ flink-connector-gcp-pubsub-emulator-tests --- [INFO] [INFO] --- directory-maven-plugin:0.1:highest-basedir (directories) @ flink-connector-gcp-pubsub-emulator-tests --- [INFO] Highest basedir set to: /Users/robert/Projects/flink [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) @ flink-connector-gcp-pubsub-emulator-tests --- [INFO] [INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ flink-connector-gcp-pubsub-emulator-tests --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /Users/robert/Projects/flink/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/main/resources [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ flink-connector-gcp-pubsub-emulator-tests --- [INFO] No sources to compile [INFO] [INFO] --- maven-resources-plugin:3.1.0:testResources (default-testResources) @ flink-connector-gcp-pubsub-emulator-tests --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 1 resource [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ flink-connector-gcp-pubsub-emulator-tests --- [INFO] Nothing to compile - all classes are up to date [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 7.389 s [INFO] Finished at: 2019-05-02T16:47:04+02:00 [INFO] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it
tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280434081 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -582,26 +458,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) { final String registrationName, final KvStateID kvStateId, final InetSocketAddress kvStateServerAddress) { - if (jobGraph.getJobID().equals(jobId)) { - if (log.isDebugEnabled()) { - log.debug("Key value state registered for job {} under name {}.", - jobGraph.getJobID(), registrationName); - } - - try { - executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered( - jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress); - return CompletableFuture.completedFuture(Acknowledge.get()); - } catch (Exception e) { - log.error("Failed to notify KvStateRegistry about registration {}.", registrationName, e); - return FutureUtils.completedExceptionally(e); - } - } else { - if (log.isDebugEnabled()) { - log.debug("Notification about key-value state registration for unknown job {} received.", jobId); - } - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + try { + schedulerNG.notifyKvStateRegistered(jobId, jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress); + return CompletableFuture.completedFuture(Acknowledge.get()); + } catch (FlinkJobNotFoundException e) { Review comment: same here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it
tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280432648 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -437,43 +391,20 @@ public JobMaster( final IntermediateDataSetID intermediateResultId, final ResultPartitionID resultPartitionId) { - final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId()); - if (execution != null) { - return CompletableFuture.completedFuture(execution.getState()); - } - else { - final IntermediateResult intermediateResult = - executionGraph.getAllIntermediateResults().get(intermediateResultId); - - if (intermediateResult != null) { - // Try to find the producing execution - Execution producerExecution = intermediateResult - .getPartitionById(resultPartitionId.getPartitionId()) - .getProducer() - .getCurrentExecutionAttempt(); - - if (producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) { - return CompletableFuture.completedFuture(producerExecution.getState()); - } else { - return FutureUtils.completedExceptionally(new PartitionProducerDisposedException(resultPartitionId)); - } - } else { - return FutureUtils.completedExceptionally(new IllegalArgumentException("Intermediate data set with ID " - + intermediateResultId + " not found.")); - } + try { + return CompletableFuture.completedFuture(schedulerNG.requestPartitionState(intermediateResultId, resultPartitionId)); + } catch (PartitionProducerDisposedException e) { Review comment: I would also log the exception here before it leaves the component. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it
tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280447981 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java ## @@ -0,0 +1,639 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.function.FunctionUtils; + +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import
[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it
tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280453988 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java ## @@ -0,0 +1,639 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.function.FunctionUtils; + +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import
[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it
tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280453953 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java ## @@ -0,0 +1,639 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.queryablestate.KvStateID; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionGraphException; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.query.KvStateLocation; +import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.function.FunctionUtils; + +import org.slf4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import
[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it
tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280434680 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -809,80 +625,18 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { final boolean advanceToEndOfEventTime, final Time timeout) { - final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - - if (checkpointCoordinator == null) { - return FutureUtils.completedExceptionally(new IllegalStateException( - String.format("Job %s is not a streaming job.", jobGraph.getJobID(; - } - - if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) { - log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID()); - - return FutureUtils.completedExceptionally(new IllegalStateException( - "No savepoint directory configured. You can either specify a directory " + - "while cancelling via -s :targetDirectory or configure a cluster-wide " + - "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.")); - } - - final long now = System.currentTimeMillis(); - - // we stop the checkpoint coordinator so that we are guaranteed - // to have only the data of the synchronous savepoint committed. - // in case of failure, and if the job restarts, the coordinator - // will be restarted by the CheckpointCoordinatorDeActivator. - checkpointCoordinator.stopCheckpointScheduler(); - - final CompletableFuture savepointFuture = checkpointCoordinator - .triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory) - .handleAsync((completedCheckpoint, throwable) -> { - if (throwable != null) { - log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage()); - throw new CompletionException(throwable); - } - return completedCheckpoint.getExternalPointer(); - }, getMainThreadExecutor()); - - final CompletableFuture terminationFuture = executionGraph - .getTerminationFuture() - .handleAsync((jobstatus, throwable) -> { - - if (throwable != null) { - log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage()); - throw new CompletionException(throwable); - } else if(jobstatus != JobStatus.FINISHED) { - log.info("Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", jobGraph.getJobID(), jobstatus); - throw new CompletionException(new FlinkException("Reached state " + jobstatus + " instead of FINISHED.")); - } - return jobstatus; - }, getMainThreadExecutor()); - - return savepointFuture.thenCompose((path) -> - terminationFuture.thenApply((jobStatus -> path))); - } - - private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) { - if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) { - try { - checkpointCoordinator.startCheckpointScheduler(); - } catch (IllegalStateException ignored) { - // Concurrent shut down of the coordinator - } - } + return schedulerNG.stopWithSavepoint(targetDirectory, advanceToEndOfEventTime); } @Override public CompletableFuture requestOperatorBackPressureStats(final JobVertexID jobVertexId) { - final ExecutionJobVertex jobVertex =
[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it
tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280438925 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -1006,101 +760,58 @@ private Acknowledge suspendExecution(final Exception cause) { return Acknowledge.get(); } - private void assignExecutionGraph( - ExecutionGraph newExecutionGraph, + private void assignScheduler( + SchedulerNG newScheduler, JobManagerJobMetricGroup newJobManagerJobMetricGroup) { validateRunsInMainThread(); - checkState(executionGraph.getState().isTerminalState()); + checkState(schedulerNG.requestJobStatus().isTerminalState()); checkState(jobManagerJobMetricGroup == null); - executionGraph = newExecutionGraph; + schedulerNG = newScheduler; jobManagerJobMetricGroup = newJobManagerJobMetricGroup; } - private void resetAndScheduleExecutionGraph() throws Exception { + private void resetAndStartScheduler() throws Exception { validateRunsInMainThread(); - final CompletableFuture executionGraphAssignedFuture; + final CompletableFuture schedulerAssignedFuture; - if (executionGraph.getState() == JobStatus.CREATED) { - executionGraphAssignedFuture = CompletableFuture.completedFuture(null); - executionGraph.start(getMainThreadExecutor()); + if (schedulerNG.requestJobStatus() == JobStatus.CREATED) { Review comment: This should be gone once https://issues.apache.org/jira/browse/FLINK-11719 has been resolved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it
tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280433723 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -501,76 +432,21 @@ public void acknowledgeCheckpoint( final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) { - final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint( - jobID, - executionAttemptID, - checkpointId, - checkpointMetrics, - checkpointState); - - if (checkpointCoordinator != null) { - getRpcService().execute(() -> { - try { - checkpointCoordinator.receiveAcknowledgeMessage(ackMessage); - } catch (Throwable t) { - log.warn("Error while processing checkpoint acknowledgement message", t); - } - }); - } else { - String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator"; - if (executionGraph.getState() == JobStatus.RUNNING) { - log.error(errorMessage, jobGraph.getJobID()); - } else { - log.debug(errorMessage, jobGraph.getJobID()); - } - } + schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState); } // TODO: This method needs a leader session ID @Override public void declineCheckpoint(DeclineCheckpoint decline) { - final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - - if (checkpointCoordinator != null) { - getRpcService().execute(() -> { - try { - checkpointCoordinator.receiveDeclineMessage(decline); - } catch (Exception e) { - log.error("Error in CheckpointCoordinator while processing {}", decline, e); - } - }); - } else { - String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator"; - if (executionGraph.getState() == JobStatus.RUNNING) { - log.error(errorMessage, jobGraph.getJobID()); - } else { - log.debug(errorMessage, jobGraph.getJobID()); - } - } + schedulerNG.declineCheckpoint(decline); } @Override public CompletableFuture requestKvStateLocation(final JobID jobId, final String registrationName) { - // sanity check for the correct JobID - if (jobGraph.getJobID().equals(jobId)) { - if (log.isDebugEnabled()) { - log.debug("Lookup key-value state for job {} with registration " + - "name {}.", jobGraph.getJobID(), registrationName); - } - - final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry(); - final KvStateLocation location = registry.getKvStateLocation(registrationName); - if (location != null) { - return CompletableFuture.completedFuture(location); - } else { - return FutureUtils.completedExceptionally(new UnknownKvStateLocation(registrationName)); - } - } else { - if (log.isDebugEnabled()) { - log.debug("Request of key-value state location for unknown job {} received.", jobId); - } - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + try { + return CompletableFuture.completedFuture(schedulerNG.requestKvStateLocation(jobId, registrationName)); + } catch (UnknownKvStateLocation | FlinkJobNotFoundException e) { Review comment: logging statement missing This is
[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it
tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280434123 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -611,26 +473,11 @@ public void declineCheckpoint(DeclineCheckpoint decline) { JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) { - if (jobGraph.getJobID().equals(jobId)) { - if (log.isDebugEnabled()) { - log.debug("Key value state unregistered for job {} under name {}.", - jobGraph.getJobID(), registrationName); - } - - try { - executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered( - jobVertexId, keyGroupRange, registrationName); - - return CompletableFuture.completedFuture(Acknowledge.get()); - } catch (Exception e) { - log.error("Failed to notify KvStateRegistry about unregistration {}.", registrationName, e); - return FutureUtils.completedExceptionally(e); - } - } else { - if (log.isDebugEnabled()) { - log.debug("Notification about key-value state deregistration for unknown job {} received.", jobId); - } - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + try { + schedulerNG.notifyKvStateUnregistered(jobId, jobVertexId, keyGroupRange, registrationName); + return CompletableFuture.completedFuture(Acknowledge.get()); + } catch (FlinkJobNotFoundException e) { Review comment: and here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-488702391 hi, @tillrohrmann, Sorry, it's ok, I just think you just add a future tool just now. you had fix the issue #8334 too. Best. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-488702391 hi, @tillrohrmann, Sorry, it's ok, I just think you just add a future tool just now. you had fix the issue #8334 too. Best. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12393) Add the user-facing classes of the new type system
Timo Walther created FLINK-12393: Summary: Add the user-facing classes of the new type system Key: FLINK-12393 URL: https://issues.apache.org/jira/browse/FLINK-12393 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Timo Walther Assignee: Timo Walther FLINK-12253 introduces logical types that will be used mostly internally. Users will use the {{DataType}} stack described in FLIP-37. This issue describes the class hierarchy around this class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] lamber-ken commented on issue #8334: [FLINK-12219] Add utility to check for normal future completion
lamber-ken commented on issue #8334: [FLINK-12219] Add utility to check for normal future completion URL: https://github.com/apache/flink/pull/8334#issuecomment-488700701 @tillrohrmann, LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
flinkbot commented on issue #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335#issuecomment-488698361 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode
lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode URL: https://github.com/apache/flink/pull/8254#issuecomment-488698323 hi, @tillrohrmann, I think your fix is fantastic [tillrohrmann@524ead4](https://github.com/tillrohrmann/flink/commit/524ead44f68ff02d52e8ff561c9e235e5b353123). like the [FLINK-12247](https://github.com/apache/flink/pull/8250), We used flink from 1.2.1 to 1.6.3 and we have been puzzled by this problem for a long time. Because it‘s needs the count of flink job failed attempt up to the max attempt history size, and no error log information that we can find the reason why the flink job failed but the yarn application don't stop in detach mode. May be other users had met the same bug, but they did not mention it or fix it. So I think it's better to log the error information and stop the application like your solution [tillrohrmann@524ead4](https://github.com/tillrohrmann/flink/commit/524ead44f68ff02d52e8ff561c9e235e5b353123). If don't do that, any unexpected exception happens, the yarn application will can not stop again. It will need users to find out the reason, it's hard to users. Best. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12253) Setup a class hierarchy for the new logical type system
[ https://issues.apache.org/jira/browse/FLINK-12253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12253: --- Labels: pull-request-available (was: ) > Setup a class hierarchy for the new logical type system > --- > > Key: FLINK-12253 > URL: https://issues.apache.org/jira/browse/FLINK-12253 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Setup a new class hierarchy around {{LogicalType}} in {{table-common}}. > The classes implement the types listed in the table of FLIP-37. > The classes won't be connected to the API yet. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] twalthr opened a new pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system
twalthr opened a new pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system URL: https://github.com/apache/flink/pull/8335 ## What is the purpose of the change This PR implements all logical types that Flink's type system should support in the near future. As the name indicates, these type classes are purely logical to allow declarations and intentions across modules and in the API. Planners/runtime can still decide which type and precision is supported physically. An exact listing can be found in FLIP-37: https://docs.google.com/document/d/1a9HUb6OaBIoj9IRfbILcMFPrOL7ALeZ3rVI66dvA2_U/edit# The most important content of this PR are the JavaDoc comments that clearly define each type and should avoid ambiguity. Catalogs/connectors/planners should adapt to those definitions. ## Brief change log - 27 logical types added ## Verifying this change - `org.apache.flink.table.types.LogicalTypesTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12253) Setup a class hierarchy for the new logical type system
[ https://issues.apache.org/jira/browse/FLINK-12253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-12253: - Description: Setup a new class hierarchy around {{LogicalType}} in {{table-common}}. The classes implement the types listed in the table of FLIP-37. The classes won't be connected to the API yet. was: Setup a new class hierarchy around {{DataType}} and {{LogicalType}} in {{table-common}}. The classes implement the types listed in the table of FLIP-37. The classes won't be connected to the API yet. > Setup a class hierarchy for the new logical type system > --- > > Key: FLINK-12253 > URL: https://issues.apache.org/jira/browse/FLINK-12253 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > Setup a new class hierarchy around {{LogicalType}} in {{table-common}}. > The classes implement the types listed in the table of FLIP-37. > The classes won't be connected to the API yet. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12253) Setup a class hierarchy for the new logical type system
[ https://issues.apache.org/jira/browse/FLINK-12253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-12253: - Summary: Setup a class hierarchy for the new logical type system (was: Setup a class hierarchy for the new type system) > Setup a class hierarchy for the new logical type system > --- > > Key: FLINK-12253 > URL: https://issues.apache.org/jira/browse/FLINK-12253 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > Setup a new class hierarchy around {{DataType}} and {{LogicalType}} in > {{table-common}}. > The classes implement the types listed in the table of FLIP-37. > The classes won't be connected to the API yet. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r280416137 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for streaming job which will schedule all tasks at the same time. + */ +public class EagerSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(false); + + public EagerSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + } + + @Override + public void startScheduling() { + // Schedule all the vertices in scheduling topology at the same time. Review comment: I think this comment is not needed as it is already covered in the class' javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r279634373 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingVertex.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.Collection; + +/** + * A Simple scheduling vertex for testing purposes. + */ +public class TestingSchedulingVertex implements SchedulingVertex { + + private final ExecutionVertexID executionVertexId; + + public TestingSchedulingVertex(JobVertexID jobVertexId, int subtaskIndex) { + this.executionVertexId = new ExecutionVertexID(jobVertexId, subtaskIndex); + } + + @Override + public ExecutionVertexID getId() { + return executionVertexId; + } + + @Override + public ExecutionState getState() { + return ExecutionState.CREATED; + } + + @Override + public JobVertexID getJobVertexId() { + return executionVertexId.getJobVertexId(); + } + + @Override + public Collection getConsumedResultPartitions() { + return null; + } + + @Override + public Collection getProducedResultPartitions() { + return null; + } Review comment: Formatting looks off here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r280435403 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link EagerSchedulingStrategy}. + */ +public class EagerSchedulingStrategyTest extends TestLogger { + + /** +* Tests that when start scheduling eager scheduling strategy will start all vertices in scheduling topology. +*/ + @Test + public void testStartScheduling() { + TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology(); + JobVertexID jobVertexID = new JobVertexID(); + testingSchedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertexID, 0)); + testingSchedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertexID, 1)); + testingSchedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertexID, 2)); + testingSchedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertexID, 3)); + testingSchedulingTopology.addSchedulingVertex(new TestingSchedulingVertex(jobVertexID, 4)); + + TestingSchedulerOperation testingSchedulerOperation = new TestingSchedulerOperation(); + EagerSchedulingStrategy schedulingStrategy = new EagerSchedulingStrategy( + testingSchedulerOperation, + testingSchedulingTopology, + null); + + schedulingStrategy.startScheduling(); + + assertEquals(5, testingSchedulerOperation.getScheduledVertices().size()); Review comment: The assertions can be stricter. In particular, we should assert: 1. All vertices in the topology are scheduled at once, i.e., there is only one call to `allocateSlotsAndDeploy()` 2. The deployed ExecutionVertexIDs match with the vertices in the topology. To ease testing, I think it would help to make the constructor of `TestingSchedulingVertex` accept the `ExecutionVertexID` directly: ``` TestingSchedulingVertex(new ExecutionVetexID(...)) ``` Using [hamcrest matchers](https://github.com/junit-team/junit4/wiki/matchers-and-assertthat): ``` assertThat(testingSchedulingOperations..., containsInAnyOrder(executionVertexId1, executionVertexId2)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r280430217 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperation.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * A Simple scheduler operation for testing purposes. + */ +public class TestingSchedulerOperation implements SchedulerOperations { + + private final List scheduledVertices = new ArrayList<>(); Review comment: If we make this a `List>` our assertions in the tests can be stricter, i.e., we can assert on how many times `allocateSlotsAndDeploy` was called. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r279718029 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for streaming job which will schedule all tasks at the same time. + */ +public class EagerSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(false); + + public EagerSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { Review comment: Let's remove `jobGraph` from the constructor since it's not used inside `EagerSchedulingStrategy`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r280423763 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for streaming job which will schedule all tasks at the same time. + */ +public class EagerSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(false); + + public EagerSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology, + JobGraph jobGraph) { + this.schedulerOperations = checkNotNull(schedulerOperations); + this.schedulingTopology = checkNotNull(schedulingTopology); + } + + @Override + public void startScheduling() { + // Schedule all the vertices in scheduling topology at the same time. + List executionVertexDeploymentOptions = new ArrayList<>(); Review comment: Some code from here is duplicated in `restartTasks`. I would propose to split the methods into smaller units, e.g.,: ``` @Override public void startScheduling() { final Set allVertices = getAllVerticesFromTopology(); allocateSlotsAndDeploy(allVertices); } @Override public void restartTasks(Set verticesNeedingRestart) { allocateSlotsAndDeploy(verticesNeedingRestart); } private Set getAllVerticesFromTopology() { return StreamSupport .stream(schedulingTopology.getVertices().spliterator(), false) .map(SchedulingVertex::getId) .collect(Collectors.toSet()); } private void allocateSlotsAndDeploy(final Set allVertices) { final List executionVertexDeploymentOptions = createExecutionVertexDeploymentOptions(allVertices); schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions); } private List createExecutionVertexDeploymentOptions(final Iterable verticesNeedingRestart) { List executionVertexDeploymentOptions = new ArrayList<>(); for (ExecutionVertexID executionVertexID : verticesNeedingRestart) { executionVertexDeploymentOptions.add( new ExecutionVertexDeploymentOption(executionVertexID, deploymentOption)); } return executionVertexDeploymentOptions; } ``` Feel free to adapt my proposal to your liking. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r279731809 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocator.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.executiongraph.Execution; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * Component responsible for assigning slots to a collection of {@link Execution}. + */ +public interface ExecutionSlotAllocator { Review comment: Let's add this interface under this ticket: https://issues.apache.org/jira/browse/FLINK-12372 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r279724405 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * A Simple scheduling topology for testing purposes. Review comment: nit: _A simple [...]_ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r280319208 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingVertex.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.Collection; + +/** + * A Simple scheduling vertex for testing purposes. + */ +public class TestingSchedulingVertex implements SchedulingVertex { + + private final ExecutionVertexID executionVertexId; + + public TestingSchedulingVertex(JobVertexID jobVertexId, int subtaskIndex) { + this.executionVertexId = new ExecutionVertexID(jobVertexId, subtaskIndex); + } + + @Override + public ExecutionVertexID getId() { + return executionVertexId; + } + + @Override + public ExecutionState getState() { + return ExecutionState.CREATED; + } + + @Override + public JobVertexID getJobVertexId() { + return executionVertexId.getJobVertexId(); + } + + @Override + public Collection getConsumedResultPartitions() { + return null; + } + + @Override + public Collection getProducedResultPartitions() { + return null; Review comment: To avoid handling `null` as a special case, we should just return `Collections.emptyList()` if there are no `SchedulingResultPartitions`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r279728912 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingVertex.java ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.Collection; + +/** + * A Simple scheduling vertex for testing purposes. + */ +public class TestingSchedulingVertex implements SchedulingVertex { + + private final ExecutionVertexID executionVertexId; + + public TestingSchedulingVertex(JobVertexID jobVertexId, int subtaskIndex) { + this.executionVertexId = new ExecutionVertexID(jobVertexId, subtaskIndex); + } + + @Override + public ExecutionVertexID getId() { + return executionVertexId; + } + + @Override + public ExecutionState getState() { + return ExecutionState.CREATED; + } + + @Override + public JobVertexID getJobVertexId() { + return executionVertexId.getJobVertexId(); + } + + @Override + public Collection getConsumedResultPartitions() { + return null; Review comment: To avoid handling `null` as a special case, we should just return `Collections.emptyList()` if there are no `SchedulingResultPartitions`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r280310271 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The slot assignment for a {@link ExecutionVertex}. + */ +public class SlotExecutionVertexAssignment { Review comment: Let's add this class under this ticket: https://issues.apache.org/jira/browse/FLINK-12372 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r279729385 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * A Simple scheduling topology for testing purposes. + */ +public class TestingSchedulingTopology implements SchedulingTopology { + + private final List schedulingVertices = new ArrayList<>(); + + @Override + public Iterable getVertices() { + return schedulingVertices; + } + + @Override + public Optional getVertex(ExecutionVertexID executionVertexId) { + SchedulingVertex returnVertex = null; + for (SchedulingVertex schedulingVertex : schedulingVertices) { + if (schedulingVertex.getId().equals(executionVertexId)) { + returnVertex = schedulingVertex; + break; + } + } + return Optional.ofNullable(returnVertex); + } + + @Override + public Optional getResultPartition( + IntermediateResultPartitionID intermediateResultPartitionId) { + return Optional.ofNullable(null); Review comment: I think it's better to honor the contract of this method and provide a _"working"_ implementation: ``` private final Map schedulingResultPartitions = new HashMap<>(); ``` ``` @Override public Optional getResultPartition( IntermediateResultPartitionID intermediateResultPartitionId) { return Optional.ofNullable(schedulingResultPartitions.get(intermediateResultPartitionId)); } public void addSchedulingVertex(SchedulingVertex schedulingVertex) { addResultPartitions(schedulingVertex); // ... } private void addResultPartitions(final SchedulingVertex schedulingVertex) { addResultPartitions(schedulingVertex.getConsumedResultPartitions()); addResultPartitions(schedulingVertex.getProducedResultPartitions()); } private void addResultPartitions(final Collection resultPartitions) { for (SchedulingResultPartition schedulingResultPartition : resultPartitions) { schedulingResultPartitions.put(schedulingResultPartition.getId(), schedulingResultPartition); } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r280310078 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirements.java ## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; + +import java.util.Collection; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The requirements for scheduling a {@link ExecutionVertex}. + */ +public class ExecutionVertexSchedulingRequirements { Review comment: Let's add this class under this ticket: https://issues.apache.org/jira/browse/FLINK-12372 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r279718671 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperation.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * A Simple scheduler operation for testing purposes. + */ +public class TestingSchedulerOperation implements SchedulerOperations { + + private final List scheduledVertices = new ArrayList<>(); + + @Override + public void allocateSlotsAndDeploy(Collection executionVertexDeploymentOptions) { + scheduledVertices.addAll(executionVertexDeploymentOptions); + } + + public List getScheduledVertices() { + return scheduledVertices; + } Review comment: Formatting looks off here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r280430142 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperation.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * A Simple scheduler operation for testing purposes. + */ +public class TestingSchedulerOperation implements SchedulerOperations { + + private final List scheduledVertices = new ArrayList<>(); + + @Override + public void allocateSlotsAndDeploy(Collection executionVertexDeploymentOptions) { + scheduledVertices.addAll(executionVertexDeploymentOptions); + } + + public List getScheduledVertices() { + return scheduledVertices; Review comment: I would prefer to return an immutable collection here to avoid accidental modification. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r280426220 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperation.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * A Simple scheduler operation for testing purposes. + */ +public class TestingSchedulerOperation implements SchedulerOperations { Review comment: To be consistent with the interface, this should be named `TestingSchedulerOperations` (plural). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy URL: https://github.com/apache/flink/pull/8296#discussion_r279733104 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerOperations.java ## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Default {@link SchedulerOperations} which will allocate slots and deploy the vertices when all slots are returned. + */ +public class DefaultSchedulerOperations implements SchedulerOperations { Review comment: Nice implementation but I think it is outside of the scope of FLINK-12228. I would propose to save this code and revisit it when we implement the actual scheduler. Also the changes to `Execution` should be reverted for now, and revisited later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services