[GitHub] [flink] twalthr commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system
twalthr commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system URL: https://github.com/apache/flink/pull/8360#discussion_r282347087 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java ## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Describes the data type of a value in the table ecosystem. Instances of this class can be used to + * declare input and/or output types of operations. + * + * The {@link DataType} class has two responsibilities: declaring a logical type and giving hints + * about the physical representation of data to the optimizer. While the logical type is mandatory, + * hints are optional but useful at the edges to other APIs. + * + * The logical type is independent of any physical representation and is close to the "data type" + * terminology of the SQL standard. See {@link org.apache.flink.table.types.logical.LogicalType} and + * its subclasses for more information about available logical types and their properties. + * + * Physical hints are required at the edges of the table ecosystem. Hints indicate the data format + * that an implementation expects. For example, a data source could express that it produces values for + * logical timestamps using a {@link java.sql.Timestamp} class instead of using {@link java.time.LocalDateTime}. + * With this information, the runtime is able to convert the produced class into its internal data + * format. In return, a data sink can declare the data format it consumes from the runtime. + * + * @see DataTypes for a list of supported data types and instances of this class. + */ +@PublicEvolving +public abstract class DataType implements Serializable { + + protected LogicalType logicalType; + + protected @Nullable Class conversionClass; + + private DataType(LogicalType logicalType, @Nullable Class conversionClass) { + this.logicalType = Preconditions.checkNotNull(logicalType, "Logical type must not be null."); + this.conversionClass = performEarlyClassValidation(logicalType, conversionClass); + } + + /** +* Returns the corresponding logical type. +* +* @return a parameterized instance of {@link LogicalType} +*/ + public LogicalType getLogicalType() { + return logicalType; + } + + /** +* Returns the corresponding conversion class for representing values. If no conversion class was +* defined manually, the default conversion defined by the logical type is used. +* +* @see LogicalType#getDefaultConversion() +* +* @return the expected conversion class +*/ + public Class getConversionClass() { + if (conversionClass == null) { + return logicalType.getDefaultConversion(); + } + return conversionClass; + } + + /** +* Adds a hint that null values are not expected in the data for this type. +* +* @return a new, reconfigured data type instance +*/ + public abstract DataType notNull(); + + /** +* Adds a hint that null values are expected in the data for this type (default behavior). +* +* This method exists for explicit declaration of the default behavior or for invalidation of +* a previous call to {@link #notNull()}. +* +* @return a new, reconfigured data type instance +*/ + public abstract DataType andNull(); + + /** +* Adds a hint that data should be represented using
[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhangminglei reassigned FLINK-9410: --- Assignee: zhangminglei (was: lihongli) > Replace NMClient with NMClientAsync in YarnResourceManager > -- > > Key: FLINK-9410 > URL: https://issues.apache.org/jira/browse/FLINK-9410 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: zhangminglei >Priority: Critical > > Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} > which is called from within the main thread of the {{ResourceManager}}. Since > these operations are blocking, we should replace the client with the > {{NMClientAsync}} and make the calls non blocking. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12456) Wrong CharType convertion in type_utils.py
Jingsong Lee created FLINK-12456: Summary: Wrong CharType convertion in type_utils.py Key: FLINK-12456 URL: https://issues.apache.org/jira/browse/FLINK-12456 Project: Flink Issue Type: Bug Reporter: Jingsong Lee In types.py: define CharType as SQL CHAR, but SQL CHAR is a Java String instead of Java Character. In type_utils.py, org.apache.flink.api.common.typeinfo.Types.CHAR map to DataTypes.CHAR, it is wrong. Types.CHAR is Java Character. I suggest that consider removing Char's support first. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on issue #8376: [FLINK-12392] [table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12
wuchong commented on issue #8376: [FLINK-12392] [table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12 URL: https://github.com/apache/flink/pull/8376#issuecomment-490752948 Thanks for digging into this. LGTM. Will merge after travis turns green. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8377: [FLINK-12455][python] Move the packaging of pyflink to flink-dist
flinkbot commented on issue #8377: [FLINK-12455][python] Move the packaging of pyflink to flink-dist URL: https://github.com/apache/flink/pull/8377#issuecomment-490752300 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12455) Move the packaging of pyflink to flink-dist
[ https://issues.apache.org/jira/browse/FLINK-12455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12455: --- Labels: pull-request-available (was: ) > Move the packaging of pyflink to flink-dist > --- > > Key: FLINK-12455 > URL: https://issues.apache.org/jira/browse/FLINK-12455 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > > Currently, there is a pom.xml under module flink-python which is responsible > for the package of pyflink. The package logic should be moved to flink-dist > and then we can remove the pom.xml under flink-python and make flink-python a > pure python module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] dianfu opened a new pull request #8377: [FLINK-12455][python] Move the packaging of pyflink to flink-dist
dianfu opened a new pull request #8377: [FLINK-12455][python] Move the packaging of pyflink to flink-dist URL: https://github.com/apache/flink/pull/8377 ## What is the purpose of the change *This pull request moves the packaging of pyflink to flink-dist for the following reasons: * - *flink-dist is responsible for the packaging of flink, moving the packaging logic there is more straightforward* - *The pom.xml under flink-python can be removed and this makes flink-python a pure python module* ## Brief change log - *Moves the packaging logic to flink-dist* - *Removes the pom.xml under flink-python* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12455) Move the packaging of pyflink to flink-dist
[ https://issues.apache.org/jira/browse/FLINK-12455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu updated FLINK-12455: Summary: Move the packaging of pyflink to flink-dist (was: Move the package of pyflink to flink-dist) > Move the packaging of pyflink to flink-dist > --- > > Key: FLINK-12455 > URL: https://issues.apache.org/jira/browse/FLINK-12455 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > > Currently, there is a pom.xml under module flink-python which is responsible > for the package of pyflink. The package logic should be moved to flink-dist > and then we can remove the pom.xml under flink-python and make flink-python a > pure python module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282339745 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogBaseTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateCatalogBaseTable(newCatalogTable); + + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // TODO: [FLINK-12452] alterTable() in all catalogs should ensure existing base table and the new one are of the same type + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + +
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282339745 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogBaseTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateCatalogBaseTable(newCatalogTable); + + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // TODO: [FLINK-12452] alterTable() in all catalogs should ensure existing base table and the new one are of the same type + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + +
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282337775 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + protected abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) Review comment: This would go against the previous suggestion you made: > Similar to createTable(), I don't see we need to provide implementations for alterTable() in each subclass because I don't really see much difference. If validation is different, then we can define a new interface in the base and have each subclass implement it. How about leave this as is and refactor if necessary in the future? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282339745 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogBaseTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateCatalogBaseTable(newCatalogTable); + + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // TODO: [FLINK-12452] alterTable() in all catalogs should ensure existing base table and the new one are of the same type + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + +
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282337926 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -363,4 +332,23 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath throw new UnsupportedOperationException(); } + // -- utils -- + + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private Map retrieveFlinkProperties(Map hiveTableParams) { Review comment: I'd just leave them as-is, since I don't see much value of making it static given no other class uses it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282337926 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -363,4 +332,23 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath throw new UnsupportedOperationException(); } + // -- utils -- + + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private Map retrieveFlinkProperties(Map hiveTableParams) { Review comment: What's the key point of making it static given no other class uses it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282337775 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + protected abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) Review comment: This would go against the previous suggestion you made: > Similar to createTable(), I don't see we need to provide implementations for alterTable() in each subclass because I don't really see much difference. If validation is different, then we can define a new interface in the base and have each subclass implement it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] HuangZhenQiu commented on issue #8303: [FLINK-12343]add file replication config for yarn configuration
HuangZhenQiu commented on issue #8303: [FLINK-12343]add file replication config for yarn configuration URL: https://github.com/apache/flink/pull/8303#issuecomment-490741884 @xintongsong @tillrohrmann @rmetzger Thanks for the informative comments. The verified the solution end to end. It works. I will continue with adding an integration test in flink-yarn-tests module in the PR later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12455) Move the package of pyflink to flink-dist
Dian Fu created FLINK-12455: --- Summary: Move the package of pyflink to flink-dist Key: FLINK-12455 URL: https://issues.apache.org/jira/browse/FLINK-12455 Project: Flink Issue Type: Sub-task Components: API / Python Reporter: Dian Fu Assignee: Dian Fu Currently, there is a pom.xml under module flink-python which is responsible for the package of pyflink. The package logic should be moved to flink-dist and then we can remove the pom.xml under flink-python and make flink-python a pure python module. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wuchong commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system
wuchong commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system URL: https://github.com/apache/flink/pull/8360#discussion_r282335432 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java ## @@ -0,0 +1,703 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.AnyType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DateType; +import org.apache.flink.table.types.logical.DayTimeIntervalType; +import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.FloatType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.NullType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.SmallIntType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.flink.table.types.logical.TypeInformationAnyType; +import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.types.logical.YearMonthIntervalType; +import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution; +import org.apache.flink.table.types.logical.ZonedTimestampType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A {@link DataType} can be used to declare input and/or output types of operations. This class + * enumerates all supported data types of the Table & SQL API. + */ +@PublicEvolving +public final class DataTypes { + + // we use SQL-like naming for data types and avoid Java keyword clashes + // CHECKSTYLE.OFF: MethodName + + /** +* Data type of a fixed-length character string {@code CHAR(n)} where {@code n} is the number +* of code points. {@code n} must have a value between 1 and 255 (both inclusive). +* +* @see CharType +*/ + public static DataType.AtomicDataType CHAR(int n) { + return new DataType.AtomicDataType(new CharType(n)); + } + + /** +* Data type of a variable-length character string {@code VARCHAR(n)} where {@code n} is the +* maximum number of code points. {@code n} must have a value between 1 and {@link Integer#MAX_VALUE} +* (both inclusive). +* +* @see VarCharType +*/ + public static DataType.AtomicDataType VARCHAR(int n) { + return new DataType.AtomicDataType(new VarCharType(n)); + } + + /** +* Data type of a variable-length character string with defined maximum length. This is a shortcut +* for {@code VARCHAR(2147483647)} for representing JVM strings. +* +* @see VarCharType +
[GitHub] [flink] wuchong commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system
wuchong commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system URL: https://github.com/apache/flink/pull/8360#discussion_r282334560 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java ## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Describes the data type of a value in the table ecosystem. Instances of this class can be used to + * declare input and/or output types of operations. + * + * The {@link DataType} class has two responsibilities: declaring a logical type and giving hints + * about the physical representation of data to the optimizer. While the logical type is mandatory, + * hints are optional but useful at the edges to other APIs. + * + * The logical type is independent of any physical representation and is close to the "data type" + * terminology of the SQL standard. See {@link org.apache.flink.table.types.logical.LogicalType} and + * its subclasses for more information about available logical types and their properties. + * + * Physical hints are required at the edges of the table ecosystem. Hints indicate the data format + * that an implementation expects. For example, a data source could express that it produces values for + * logical timestamps using a {@link java.sql.Timestamp} class instead of using {@link java.time.LocalDateTime}. + * With this information, the runtime is able to convert the produced class into its internal data + * format. In return, a data sink can declare the data format it consumes from the runtime. + * + * @see DataTypes for a list of supported data types and instances of this class. + */ +@PublicEvolving +public abstract class DataType implements Serializable { + + protected LogicalType logicalType; + + protected @Nullable Class conversionClass; + + private DataType(LogicalType logicalType, @Nullable Class conversionClass) { + this.logicalType = Preconditions.checkNotNull(logicalType, "Logical type must not be null."); + this.conversionClass = performEarlyClassValidation(logicalType, conversionClass); + } + + /** +* Returns the corresponding logical type. +* +* @return a parameterized instance of {@link LogicalType} +*/ + public LogicalType getLogicalType() { + return logicalType; + } + + /** +* Returns the corresponding conversion class for representing values. If no conversion class was +* defined manually, the default conversion defined by the logical type is used. +* +* @see LogicalType#getDefaultConversion() +* +* @return the expected conversion class +*/ + public Class getConversionClass() { + if (conversionClass == null) { + return logicalType.getDefaultConversion(); + } + return conversionClass; + } + + /** +* Adds a hint that null values are not expected in the data for this type. +* +* @return a new, reconfigured data type instance +*/ + public abstract DataType notNull(); + + /** +* Adds a hint that null values are expected in the data for this type (default behavior). +* +* This method exists for explicit declaration of the default behavior or for invalidation of +* a previous call to {@link #notNull()}. +* +* @return a new, reconfigured data type instance +*/ + public abstract DataType andNull(); Review comment: How about rename `andNull()` --> `asNullable()` and
[GitHub] [flink] wuchong commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system
wuchong commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system URL: https://github.com/apache/flink/pull/8360#discussion_r282334300 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java ## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Describes the data type of a value in the table ecosystem. Instances of this class can be used to + * declare input and/or output types of operations. + * + * The {@link DataType} class has two responsibilities: declaring a logical type and giving hints + * about the physical representation of data to the optimizer. While the logical type is mandatory, + * hints are optional but useful at the edges to other APIs. + * + * The logical type is independent of any physical representation and is close to the "data type" + * terminology of the SQL standard. See {@link org.apache.flink.table.types.logical.LogicalType} and + * its subclasses for more information about available logical types and their properties. + * + * Physical hints are required at the edges of the table ecosystem. Hints indicate the data format + * that an implementation expects. For example, a data source could express that it produces values for + * logical timestamps using a {@link java.sql.Timestamp} class instead of using {@link java.time.LocalDateTime}. + * With this information, the runtime is able to convert the produced class into its internal data + * format. In return, a data sink can declare the data format it consumes from the runtime. + * + * @see DataTypes for a list of supported data types and instances of this class. + */ +@PublicEvolving +public abstract class DataType implements Serializable { + + protected LogicalType logicalType; + + protected @Nullable Class conversionClass; + + private DataType(LogicalType logicalType, @Nullable Class conversionClass) { + this.logicalType = Preconditions.checkNotNull(logicalType, "Logical type must not be null."); + this.conversionClass = performEarlyClassValidation(logicalType, conversionClass); + } + + /** +* Returns the corresponding logical type. +* +* @return a parameterized instance of {@link LogicalType} +*/ + public LogicalType getLogicalType() { + return logicalType; + } + + /** +* Returns the corresponding conversion class for representing values. If no conversion class was +* defined manually, the default conversion defined by the logical type is used. +* +* @see LogicalType#getDefaultConversion() +* +* @return the expected conversion class +*/ + public Class getConversionClass() { + if (conversionClass == null) { + return logicalType.getDefaultConversion(); + } + return conversionClass; + } + + /** +* Adds a hint that null values are not expected in the data for this type. +* +* @return a new, reconfigured data type instance +*/ + public abstract DataType notNull(); + + /** +* Adds a hint that null values are expected in the data for this type (default behavior). +* +* This method exists for explicit declaration of the default behavior or for invalidation of +* a previous call to {@link #notNull()}. +* +* @return a new, reconfigured data type instance +*/ + public abstract DataType andNull(); + + /** +* Adds a hint that data should be represented using
[GitHub] [flink] chunweilei commented on a change in pull request #8266: [FLINK-9904]Allow users to control MaxDirectMemorySize
chunweilei commented on a change in pull request #8266: [FLINK-9904]Allow users to control MaxDirectMemorySize URL: https://github.com/apache/flink/pull/8266#discussion_r282333076 ## File path: flink-dist/src/main/flink-bin/bin/config.sh ## @@ -429,6 +430,11 @@ if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX}) fi +# Define FLINK_TM_MAX_OFFHEAP_SIZE if it is not already set +if [ -z "${FLINK_TM_MAX_OFFHEAP_SIZE}" ]; then +# default: Long.MAX_VALUE in TB +FLINK_TM_MAX_OFFHEAP_SIZE=$(readFromConfig ${KEY_TASKM_MAX_OFFHEAP_SIZE} "8388607T" "${YAML_CONF}") Review comment: Maybe it is set to -1 by user incorrectly. You can find other configurations like `FLINK_TM_NET_BUF_MAX` use this condition too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#issuecomment-490734729 Hi, @GJL @tillrohrmann , I have updated the PR based on the latest master. Would you mind to begin another round of code review ? As discussed in the first round of code review, there are still some open questions, I make a summary to be convenient for discussion. Any comment is highly appreciated. 1. [[SchedulePolicy](https://github.com/apache/flink/pull/8309#discussion_r281049742)] Using vertex state transitions to schedule vertices has the benefit of avoiding flood `onPartitionConsumable` notifications, while there may be idle-waiting-result of PIPELINED shuffle mode. So, I think we could keep the benefit by relying on both vertex state transitions and `onPartitionConsumable` notifications. 1) `DeploymentOption#sendScheduleOrUpdateConsumerMessage` set to true if the vertex has PIPELINED produced result partition and set to false if all the produced result partitions are BLOCKING 2) Schedule vertices with BLOCKING input result partition using vertex state transition. 3) Schedule vertices with PIPELINED input result partitions using `onPartitionConsumable` notification. 2. [[JobGraph Usage](https://github.com/apache/flink/pull/8309#discussion_r281037134)] The only usage of `JobGraph` is to provide `InputDependencyConstraint` in `LazyFromSourcesSchedulingStrategy`, while, it is not used in `EagerSchedulingStrategy`. Maybe we could remove `JobGraph` from `SchedulingStrategyFactory#createInstance` and add `InputDependencyConstraint` information into `SchedulingTopology`, which need an new interface in `SchedulingTopology`: `InputDependencyConstraint getInputDependencyConstraint(JobVertexID jobVertexId)`? 3. [[ANY/ALL Schedule Granularity](https://issues.apache.org/jira/browse/FLINK-12229)] In the original scheduler, the schedule granularity is ANY/ALL the IntermediateDataSet finishes, and using granularity of result partition could speedup deployments but may involve flood of partition update network communication and resource deadlock. Thus, in this PR my implementation is consistent with the original logic. However, we are wondering we could use some methods to keep both speedup deployments advantage and avoiding flood partition update and resource deadlock. Based on our production experience, we propose to introduce a new trigger `InputDependencyConstraint#Progress`, which is a float between 0.0~1.0 identifying the percentage of the finish result partitions. 1.0 means ALL the input result partitions finish and we configured it to 0.8 as default in our production system to balance the speedup advantage and flood partition update, possible resource deadlock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#issuecomment-490734729 Hi, @GJL @tillrohrmann , I have updated the PR based on the latest master. Would you mind to begin another round of code review ? As discussed in the first round of code review, there are still some open questions, I make a summary to be convenient for discussion. Any comment is highly appreciated. 1. [[SchedulePolicy](https://github.com/apache/flink/pull/8309#discussion_r281049742)] Using vertex state transitions to schedule vertices has the benefit of avoiding flood `onPartitionConsumable` notifications, while there may be idle-waiting-result of PIPELINED shuffle mode. So, I think we could keep the benefit by relying on both vertex state transitions and `onPartitionConsumable` notifications. 1) `DeploymentOption#sendScheduleOrUpdateConsumerMessage` set to true if the vertex has PIPELINED produced result partition and set to false if all the produced result partitions are BLOCKING 2) Schedule vertices with BLOCKING input result partition using vertex state transition. 3) Schedule vertices with PIPELINED input result partitions using `onPartitionConsumable` notification. 2. [[JobGraph Usage](https://github.com/apache/flink/pull/8309#discussion_r281037134)] The only usage of `JobGraph` is to provide `InputDependencyConstraint` in `LazyFromSourcesSchedulingStrategy`, while, it is not used in `EagerSchedulingStrategy`. Maybe we could remove `JobGraph` from `SchedulingStrategyFactory#createInstance` and add `InputDependencyConstraint` information into `SchedulingTopology`, which need an new interface in `SchedulingTopology`: `InputDependencyConstraint getInputDependencyConstraint(JobVertexID jobVertexId)`? 3. [[ANY/ALL Schedule Granularity](https://issues.apache.org/jira/browse/FLINK-12229)] In the original scheduler, the schedule granularity is ANY/ALL the IntermediateDataSet finishes, and using granularity of result partition could speedup deployments but may involve flood of partition update network communication and resource deadlock. Thus, in this PR my implementation is consistent with the original logic. However, we are wondering we could use some methods to keep both speedup deployments advantage and avoiding flood partition update and resource deadlock. Based on our production experience, we propose to introduce a new trigger `InputDependencyConstraint#Progress`, which is a float between 0.0~1.0 identifying the percentage of the finish result partitions. 1.0 means ALL the input result partitions finish and we configured it to 0.8 in our production system as default to balance the speedup advantage and flood partition update, possible resource deadlock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12454) Add custom check options for lint-python.sh
[ https://issues.apache.org/jira/browse/FLINK-12454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12454: Labels: (was: Python) > Add custom check options for lint-python.sh > --- > > Key: FLINK-12454 > URL: https://issues.apache.org/jira/browse/FLINK-12454 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > At present, int-python.sh has Python compatibility checks and code style > checks. By default, all checks will be executed by default. In the > development stage, only code styles check required in some case. Therefore, > in this JIRA, the specified check item parameters are added for > lint-python.sh. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12454) Add custom check options for lint-python.sh
[ https://issues.apache.org/jira/browse/FLINK-12454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12454: Labels: Python (was: ) > Add custom check options for lint-python.sh > --- > > Key: FLINK-12454 > URL: https://issues.apache.org/jira/browse/FLINK-12454 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Labels: Python > > At present, int-python.sh has Python compatibility checks and code style > checks. By default, all checks will be executed by default. In the > development stage, only code styles check required in some case. Therefore, > in this JIRA, the specified check item parameters are added for > lint-python.sh. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12454) Add custom check options for lint-python.sh
[ https://issues.apache.org/jira/browse/FLINK-12454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12454: Description: At present, int-python.sh has Python compatibility checks and code style checks. By default, all checks will be executed by default. In the development stage, only code styles check required in some case. Therefore, in this JIRA, the specified check item parameters are added for lint-python.sh. (was: At present, int-python.sh has Python compatibility checks and code style checks. By default, all checks will be executed by default. In the development stage, only one check may be required in some case. Therefore, in this JIRA, the specified check item parameters are added for lint-python.sh.) > Add custom check options for lint-python.sh > --- > > Key: FLINK-12454 > URL: https://issues.apache.org/jira/browse/FLINK-12454 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > > At present, int-python.sh has Python compatibility checks and code style > checks. By default, all checks will be executed by default. In the > development stage, only code styles check required in some case. Therefore, > in this JIRA, the specified check item parameters are added for > lint-python.sh. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12454) Add custom check options for lint-python.sh
sunjincheng created FLINK-12454: --- Summary: Add custom check options for lint-python.sh Key: FLINK-12454 URL: https://issues.apache.org/jira/browse/FLINK-12454 Project: Flink Issue Type: Sub-task Components: API / Python Affects Versions: 1.9.0 Reporter: sunjincheng Assignee: sunjincheng At present, int-python.sh has Python compatibility checks and code style checks. By default, all checks will be executed by default. In the development stage, only one check may be required in some case. Therefore, in this JIRA, the specified check item parameters are added for lint-python.sh. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] eaglewatcherwb commented on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb commented on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#issuecomment-490734729 Hi, @GJL @tillrohrmann , I have updated the PR based on the latest master. Would you mind to begin another round of code review ? As discussed in the first round of code review, there are still some open questions, I make a summary to be convenient for discussion. 1. [[SchedulePolicy](https://github.com/apache/flink/pull/8309#discussion_r281049742)] Using vertex state transitions to schedule vertices has the benefit of avoiding flood `onPartitionConsumable` notifications, while there may be idle-waiting-result of PIPELINED shuffle mode. So, I think we could keep the benefit by relying on both vertex state transitions and `onPartitionConsumable` notifications. 1) `DeploymentOption#sendScheduleOrUpdateConsumerMessage` set to true if the vertex has PIPELINED produced result partition and set to false if all the produced result partitions are BLOCKING 2) Schedule vertices with BLOCKING input result partition using vertex state transition. 3) Schedule vertices with PIPELINED input result partitions using `onPartitionConsumable` notification. 2. [[JobGraph Usage](https://github.com/apache/flink/pull/8309#discussion_r281037134)] The only usage of `JobGraph` is to provide `InputDependencyConstraint` in `LazyFromSourcesSchedulingStrategy`, while, it is not used in `EagerSchedulingStrategy`. Maybe we could remove `JobGraph` from `SchedulingStrategyFactory#createInstance` and add `InputDependencyConstraint` information into `SchedulingTopology`, which need an new interface in `SchedulingTopology`: `InputDependencyConstraint getInputDependencyConstraint(JobVertexID jobVertexId)`? 3. [[ANY/ALL Schedule Granularity](https://issues.apache.org/jira/browse/FLINK-12229)] In the original scheduler, the schedule granularity is ANY/ALL the IntermediateDataSet finishes, and using granularity of result partition could speedup deployments but may involve flood of partition update network communication and resource deadlock. Thus, in this PR my implementation is consistent with the original logic. However, we are wondering we could use some methods to keep both speedup deployments advantage and avoiding flood partition update and resource deadlock. Based on our production experience, we propose to introduce a new trigger `InputDependencyConstraint#Progress`, which is a float between 0.0~1.0 and identifies the percentage of the finish result partitions. 1.0 means ALL the input result partitions finish and we configured it to 0.8 as default to balance the speedup advantage and flood partition update, possible resource deadlock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy
eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy URL: https://github.com/apache/flink/pull/8309#issuecomment-490734729 Hi, @GJL @tillrohrmann , I have updated the PR based on the latest master. Would you mind to begin another round of code review ? As discussed in the first round of code review, there are still some open questions, I make a summary to be convenient for discussion. Any comment is highly appreciated. 1. [[SchedulePolicy](https://github.com/apache/flink/pull/8309#discussion_r281049742)] Using vertex state transitions to schedule vertices has the benefit of avoiding flood `onPartitionConsumable` notifications, while there may be idle-waiting-result of PIPELINED shuffle mode. So, I think we could keep the benefit by relying on both vertex state transitions and `onPartitionConsumable` notifications. 1) `DeploymentOption#sendScheduleOrUpdateConsumerMessage` set to true if the vertex has PIPELINED produced result partition and set to false if all the produced result partitions are BLOCKING 2) Schedule vertices with BLOCKING input result partition using vertex state transition. 3) Schedule vertices with PIPELINED input result partitions using `onPartitionConsumable` notification. 2. [[JobGraph Usage](https://github.com/apache/flink/pull/8309#discussion_r281037134)] The only usage of `JobGraph` is to provide `InputDependencyConstraint` in `LazyFromSourcesSchedulingStrategy`, while, it is not used in `EagerSchedulingStrategy`. Maybe we could remove `JobGraph` from `SchedulingStrategyFactory#createInstance` and add `InputDependencyConstraint` information into `SchedulingTopology`, which need an new interface in `SchedulingTopology`: `InputDependencyConstraint getInputDependencyConstraint(JobVertexID jobVertexId)`? 3. [[ANY/ALL Schedule Granularity](https://issues.apache.org/jira/browse/FLINK-12229)] In the original scheduler, the schedule granularity is ANY/ALL the IntermediateDataSet finishes, and using granularity of result partition could speedup deployments but may involve flood of partition update network communication and resource deadlock. Thus, in this PR my implementation is consistent with the original logic. However, we are wondering we could use some methods to keep both speedup deployments advantage and avoiding flood partition update and resource deadlock. Based on our production experience, we propose to introduce a new trigger `InputDependencyConstraint#Progress`, which is a float between 0.0~1.0 and identifies the percentage of the finish result partitions. 1.0 means ALL the input result partitions finish and we configured it to 0.8 as default to balance the speedup advantage and flood partition update, possible resource deadlock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] godfreyhe opened a new pull request #8376: [FLINK-12392] [table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12
godfreyhe opened a new pull request #8376: [FLINK-12392] [table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12 URL: https://github.com/apache/flink/pull/8376 ## What is the purpose of the change *Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12* ## Brief change log - *Port FlinkRelMetadataQuery from Scala intto Java* ## Verifying this change This change is already covered by existing tests, such as *FlinkRelMdXXXTest*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12392) FlinkRelMetadataQuery does not compile with Scala 2.12
[ https://issues.apache.org/jira/browse/FLINK-12392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12392: --- Labels: pull-request-available (was: ) > FlinkRelMetadataQuery does not compile with Scala 2.12 > -- > > Key: FLINK-12392 > URL: https://issues.apache.org/jira/browse/FLINK-12392 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: godfrey he >Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0 > > > {code} > 10:57:51.770 [ERROR] > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala:52: > error: value EMPTY in class RelMetadataQuery cannot be accessed in object > org.apache.calcite.rel.metadata.RelMetadataQuery > 10:57:51.770 [ERROR] Access to protected value EMPTY not permitted because > 10:57:51.770 [ERROR] enclosing package metadata in package plan is not a > subclass of > 10:57:51.770 [ERROR] class RelMetadataQuery in package metadata where target > is defined > 10:57:51.770 [ERROR] this(RelMetadataQuery.THREAD_PROVIDERS.get, > RelMetadataQuery.EMPTY) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8376: [FLINK-12392] [table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12
flinkbot commented on issue #8376: [FLINK-12392] [table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12 URL: https://github.com/apache/flink/pull/8376#issuecomment-490732935 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12392) FlinkRelMetadataQuery does not compile with Scala 2.12
[ https://issues.apache.org/jira/browse/FLINK-12392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] godfrey he reassigned FLINK-12392: -- Assignee: godfrey he > FlinkRelMetadataQuery does not compile with Scala 2.12 > -- > > Key: FLINK-12392 > URL: https://issues.apache.org/jira/browse/FLINK-12392 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Assignee: godfrey he >Priority: Blocker > Fix For: 1.9.0 > > > {code} > 10:57:51.770 [ERROR] > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala:52: > error: value EMPTY in class RelMetadataQuery cannot be accessed in object > org.apache.calcite.rel.metadata.RelMetadataQuery > 10:57:51.770 [ERROR] Access to protected value EMPTY not permitted because > 10:57:51.770 [ERROR] enclosing package metadata in package plan is not a > subclass of > 10:57:51.770 [ERROR] class RelMetadataQuery in package metadata where target > is defined > 10:57:51.770 [ERROR] this(RelMetadataQuery.THREAD_PROVIDERS.get, > RelMetadataQuery.EMPTY) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wenhuitang commented on issue #7805: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs.
wenhuitang commented on issue #7805: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs. URL: https://github.com/apache/flink/pull/7805#issuecomment-490729884 Open another PR(https://github.com/apache/flink/pull/8375) due to this pr from unknown repository. So close this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8375: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs.
flinkbot commented on issue #8375: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs. URL: https://github.com/apache/flink/pull/8375#issuecomment-490729901 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wenhuitang closed pull request #7805: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs.
wenhuitang closed pull request #7805: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs. URL: https://github.com/apache/flink/pull/7805 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wenhuitang opened a new pull request #8375: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs.
wenhuitang opened a new pull request #8375: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs. URL: https://github.com/apache/flink/pull/8375 ## What is the purpose of the change This pull request Add readMapReduceTextFile API for HadoopInputs. https://issues.apache.org/jira/browse/FLINK-11697 ## Brief change log Add readMapReduceTextFile both for java and scala API. Add HadoopTextFormatITCase.java for mapreduce. Add HadoopTextFormatMapreduceITCase.scala. ## Verifying this change This change is already covered by existing tests, such as HadoopTextFormatITCase and HadoopTextFormatMapreduceITCase. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12453) StreamExecGlobalGroupAggregate construct wrong args for AggsHandlerCodeGenerator
[ https://issues.apache.org/jira/browse/FLINK-12453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee updated FLINK-12453: - Description: In generateAggsHandler: val generator = new AggsHandlerCodeGenerator( CodeGeneratorContext(config), relBuilder, FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes, needRetract = false, config.getNullCheck, inputFieldCopy) but AggsHandlerCodeGenerator args is: class AggsHandlerCodeGenerator( ctx: CodeGeneratorContext, relBuilder: RelBuilder, inputFieldTypes: Seq[InternalType], needRetract: Boolean, copyInputField: Boolean, needAccumulate: Boolean = true) Same issue to StreamExecIncrementalGroupAggregate was: In generateAggsHandler: val generator = new AggsHandlerCodeGenerator( CodeGeneratorContext(config), relBuilder, FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes, needRetract = false, config.getNullCheck, inputFieldCopy) but AggsHandlerCodeGenerator args is: class AggsHandlerCodeGenerator( ctx: CodeGeneratorContext, relBuilder: RelBuilder, inputFieldTypes: Seq[InternalType], needRetract: Boolean, copyInputField: Boolean, needAccumulate: Boolean = true) > StreamExecGlobalGroupAggregate construct wrong args for > AggsHandlerCodeGenerator > > > Key: FLINK-12453 > URL: https://issues.apache.org/jira/browse/FLINK-12453 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Jingsong Lee >Assignee: Jark Wu >Priority: Major > > In generateAggsHandler: > val generator = new AggsHandlerCodeGenerator( > CodeGeneratorContext(config), > relBuilder, > FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes, > needRetract = false, > config.getNullCheck, > inputFieldCopy) > but AggsHandlerCodeGenerator args is: > class AggsHandlerCodeGenerator( > ctx: CodeGeneratorContext, > relBuilder: RelBuilder, > inputFieldTypes: Seq[InternalType], > needRetract: Boolean, > copyInputField: Boolean, > needAccumulate: Boolean = true) > Same issue to StreamExecIncrementalGroupAggregate -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] wenhuitang commented on issue #7810: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs.
wenhuitang commented on issue #7810: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs. URL: https://github.com/apache/flink/pull/7810#issuecomment-490729368 Open another PR(https://github.com/apache/flink/pull/8374) due to this pr from unknown repository. So close this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wenhuitang closed pull request #7810: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs.
wenhuitang closed pull request #7810: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs. URL: https://github.com/apache/flink/pull/7810 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12453) StreamExecGlobalGroupAggregate construct wrong args for AggsHandlerCodeGenerator
Jingsong Lee created FLINK-12453: Summary: StreamExecGlobalGroupAggregate construct wrong args for AggsHandlerCodeGenerator Key: FLINK-12453 URL: https://issues.apache.org/jira/browse/FLINK-12453 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Jingsong Lee Assignee: Jark Wu In generateAggsHandler: val generator = new AggsHandlerCodeGenerator( CodeGeneratorContext(config), relBuilder, FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes, needRetract = false, config.getNullCheck, inputFieldCopy) but AggsHandlerCodeGenerator args is: class AggsHandlerCodeGenerator( ctx: CodeGeneratorContext, relBuilder: RelBuilder, inputFieldTypes: Seq[InternalType], needRetract: Boolean, copyInputField: Boolean, needAccumulate: Boolean = true) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8374: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs.
flinkbot commented on issue #8374: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs. URL: https://github.com/apache/flink/pull/8374#issuecomment-490729202 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wenhuitang opened a new pull request #8374: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs.
wenhuitang opened a new pull request #8374: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs. URL: https://github.com/apache/flink/pull/8374 ## What is the purpose of the change This pull request Add readMapRedTextFile API for HadoopInputs. https://issues.apache.org/jira/browse/FLINK-11698 ## Brief change log Add readMapRedTextFile both for java and scala API. Add HadoopTextFormatITCase.java for mapred. Add HadoopTextFormatMapredITCase.scala. ## Verifying this change This change is already covered by existing tests, such as HadoopTextFormatITCase and HadoopTextFormatMapredITCase. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282322682 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogBaseTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateCatalogBaseTable(newCatalogTable); + + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // TODO: [FLINK-12452] alterTable() in all catalogs should ensure existing base table and the new one are of the same type + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + +
[jira] [Assigned] (FLINK-12375) flink-container job jar does not have read permissions
[ https://issues.apache.org/jira/browse/FLINK-12375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reassigned FLINK-12375: Assignee: Yun Tang > flink-container job jar does not have read permissions > -- > > Key: FLINK-12375 > URL: https://issues.apache.org/jira/browse/FLINK-12375 > Project: Flink > Issue Type: Bug > Components: Deployment / Docker >Reporter: Adam Lamar >Assignee: Yun Tang >Priority: Major > > When building a custom job container using flink-container, the job can't be > launched if the provided job jar does not have world-readable permission. > This is because the job jar in the container is owned by root:root, but the > docker container executes as the flink user. > In environments with restrictive umasks (e.g. company laptops) that create > files without group and other read permissions by default, this causes the > instructions to fail. > To reproduce on master: > {code:java} > cd flink-container/docker > cp ../../flink-examples/flink-examples-streaming/target/WordCount.jar . > chmod go-r WordCount.jar # still maintain user read permission > ./build.sh --job-jar WordCount.jar --from-archive > flink-1.8.0-bin-scala_2.11.tgz --image-name flink-job:latest > FLINK_DOCKER_IMAGE_NAME=flink-job > FLINK_JOB=org.apache.flink.streaming.examples.wordcount.WordCount > docker-compose up{code} > which results in the following error: > {code:java} > job-cluster_1 | 2019-04-30 18:40:57,787 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not start > cluster entrypoint StandaloneJobClusterEntryPoint. > job-cluster_1 | > org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to > initialize the cluster entrypoint StandaloneJobClusterEntryPoint. > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190) > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:535) > job-cluster_1 | at > org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:105) > job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not > create the DispatcherResourceManagerComponent. > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:257) > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:224) > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172) > job-cluster_1 | at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171) > job-cluster_1 | ... 2 more > job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not > load the provided entrypoint class. > job-cluster_1 | at > org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:119) > job-cluster_1 | at > org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.retrieveJobGraph(ClassPathJobGraphRetriever.java:96) > job-cluster_1 | at > org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:62) > job-cluster_1 | at > org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:41) > job-cluster_1 | at > org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:184) > job-cluster_1 | ... 6 more > job-cluster_1 | Caused by: java.lang.ClassNotFoundException: > org.apache.flink.streaming.examples.wordcount.WordCount > job-cluster_1 | at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > job-cluster_1 | at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > job-cluster_1 | at > org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:116) > job-cluster_1 | ... 10 more{code} > This issue can be fixed by chown'ing the job.jar file to flink:flink in the > Dockerfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282321982 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + protected abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) Review comment: Even though this is not a public API, I feel it should be more formal. Specifically, we should probably can let this return true or false for validation result, rather than relying on throwing IllegalArgumentException. Secondly, validation can be more more complicated than just check the instance type. At this level, it shouldn't care what kind of validation the sub classes might do. I think it's acceptable if the base class doesn't care if the sub class does validation or how it does it. So it's okay if we remove this and let each subclass do its validation when create/alter table. This way might be even cleaner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282322716 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable); + } + } catch (TException e) { + throw new CatalogException( +
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282322682 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogBaseTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateCatalogBaseTable(newCatalogTable); + + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + // TODO: [FLINK-12452] alterTable() in all catalogs should ensure existing base table and the new one are of the same type + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + +
[GitHub] [flink] tianchen92 commented on a change in pull request #8266: [FLINK-9904]Allow users to control MaxDirectMemorySize
tianchen92 commented on a change in pull request #8266: [FLINK-9904]Allow users to control MaxDirectMemorySize URL: https://github.com/apache/flink/pull/8266#discussion_r282321092 ## File path: flink-dist/src/main/flink-bin/bin/config.sh ## @@ -429,6 +430,11 @@ if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX}) fi +# Define FLINK_TM_MAX_OFFHEAP_SIZE if it is not already set +if [ -z "${FLINK_TM_MAX_OFFHEAP_SIZE}" ]; then +# default: Long.MAX_VALUE in TB +FLINK_TM_MAX_OFFHEAP_SIZE=$(readFromConfig ${KEY_TASKM_MAX_OFFHEAP_SIZE} "8388607T" "${YAML_CONF}") Review comment: I am not sure. In what case would ${FLINK_TM_MAX_OFFHEAP_SIZE} be equal to -1? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282320563 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -363,4 +332,23 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath throw new UnsupportedOperationException(); } + // -- utils -- + + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private Map retrieveFlinkProperties(Map hiveTableParams) { + return hiveTableParams.entrySet().stream() + .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX)) + .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue())); + } + + /** +* Add a prefix to Flink-created properties to distinguish them from Hive-created properties. +*/ + private Map maskFlinkProperties(Map properties) { Review comment: Static? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282320535 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -363,4 +332,23 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath throw new UnsupportedOperationException(); } + // -- utils -- + + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private Map retrieveFlinkProperties(Map hiveTableParams) { Review comment: static? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12437) Taskmanager doesn't initiate registration after jobmanager marks it terminated
[ https://issues.apache.org/jira/browse/FLINK-12437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836032#comment-16836032 ] Abdul Qadeer commented on FLINK-12437: -- [~rmetzger] I don't expect a fix to be made available for this in 1.4.0. I would like to know if this is a known issue fixed in newer versions. > Taskmanager doesn't initiate registration after jobmanager marks it terminated > -- > > Key: FLINK-12437 > URL: https://issues.apache.org/jira/browse/FLINK-12437 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Abdul Qadeer >Priority: Major > > This issue is observed in Standalone cluster deployment mode with Zookeeper > HA enabled in Flink 1.4.0. A few taskmanagers restarted due to Out of > Metaspace. > The offending taskmanager `pipelineruntime-taskmgr-6789dd578b-dcp4r` first > successfully registers with jobmanager, and the remote watcher marks it > terminated soon after as seen in logs. There were other taskmanagers that > were terminated around same time but they had been quarantined by jobmanager > with message similar to: > {noformat} > Association to [akka.tcp://flink@10.60.5.121:8070] having UID [864976677] is > irrecoverably failed. UID is now quarantined and all messages to this UID > will be delivered to dead letters. Remote actorsystem must be restarted to > recover from this situation. > {noformat} > They came back up and successfully registered with jobmanager. This didn't > happen for the offending taskmanager: > > At JobManager: > {noformat} > {"timeMillis":1557073368155,"thread":"flink-akka.actor.default-dispatcher-49","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Registered > TaskManager at pipelineruntime-taskmgr-6789dd578b-dcp4r > (akka.tcp://flink@10.60.5.85:8070/user/taskmanager) as > ae61ac607f0ab35ab5066f7dc221e654. Current number of registered hosts is 8. > Current number of alive task slots is > 51.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":125,"threadPriority":5} > ... > ... > {"timeMillis":1557073391386,"thread":"flink-akka.actor.default-dispatcher-82","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Unregistered > task manager /10.60.5.85. Number of registered task managers 7. Number of > available slots > 45.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":159,"threadPriority":5} > ... > ... > {"timeMillis":1557073391483,"thread":"flink-akka.actor.default-dispatcher-82","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Unregistered > task manager /10.60.5.85. Number of registered task managers 6. Number of > available slots > 39.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":159,"threadPriority":5} > ... > ... > {"timeMillis":1557073370389,"thread":"flink-akka.actor.default-dispatcher-35","level":"INFO","loggerName":"akka.actor.LocalActorRef","message":"Message > [akka.remote.ReliableDeliverySupervisor$Ungate$] from > Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.60.5.85%3A8070-3#1863607260] > to > Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.60.5.85%3A8070-3#1863607260] > was not delivered. [22] dead letters encountered. This logging can be turned > off or adjusted with configuration settings 'akka.log-dead-letters' and > 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":98,"threadPriority":5} > {noformat} > At TaskManager: > {noformat} > {"timeMillis":1557073366068,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Starting > > TaskManager","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5} > {"timeMillis":1557073366073,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Starting > TaskManager actor system at > 10.60.5.85:8070.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5} > {"timeMillis":1557073366077,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying > to start actor system at > 10.60.5.85:8070","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5} >
[GitHub] [flink] sunhaibotb edited a comment on issue #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface
sunhaibotb edited a comment on issue #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface URL: https://github.com/apache/flink/pull/8124#issuecomment-490388025 @pnowojski , the code has been updated and it relies on the latest version of [PR-7959](https://github.com/apache/flink/pull/7959). - The throughput difference of `StreamTwoInputSelectableProcessor` to `StreamTwoInputProcessor` dropped from 3.51% to 2.88%. That is to say, **`StreamTwoInputSelectableProcessor` is still 2.88% slower than `StreamTwoInputProcessor`**. - Benchmark Results (run three times benchmark to calculate the average) `StreamTwoInputSelectableProcessor`: 23151 ops/ms `StreamTwoInputProcessor`: 23839 ops/ms - If the code which deserializes elements from the buffers is put into `StreamTwoInputSelectableProcessor#processInput()`, the performance will be greatly improved (I tested it before and see below for code structure). But this deviates from the purpose of abstracting the `Input` interface. **Is Digging JIT-optimization the next direction? Or is the performance regression acceptable under the current structure?** ``` === StreamTwoInputXProcessor.java === public boolean processInput() throws Exception { while (true) { ... if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate1); if (result.isFullRecord()) { element = deserializationDelegate1.getInstance(); } ... } } ``` - The related code of `Input#listen()` is temporary and has not been modified to base on your [PR-8361](https://github.com/apache/flink/pull/8361). Replacing listeners with `CompletableFuture` in `InputGates` is a very good design, and the old interface is really awkward to `Input`. **Next, do I put this PR on the top of your [PR-8361](https://github.com/apache/flink/pull/8361)?** P.S., the benchmark code has been split into the new [PR-8368](https://github.com/apache/flink/pull/8368). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-490712462 @wuchong can you also help take a look at the upgrade? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19 URL: https://github.com/apache/flink/pull/8324#issuecomment-490712398 @flinkbot attention @wuchong This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-12392) FlinkRelMetadataQuery does not compile with Scala 2.12
[ https://issues.apache.org/jira/browse/FLINK-12392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836018#comment-16836018 ] Jark Wu commented on FLINK-12392: - Sure [~twalthr], we will figure it out today. > FlinkRelMetadataQuery does not compile with Scala 2.12 > -- > > Key: FLINK-12392 > URL: https://issues.apache.org/jira/browse/FLINK-12392 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0 >Reporter: Chesnay Schepler >Priority: Blocker > Fix For: 1.9.0 > > > {code} > 10:57:51.770 [ERROR] > /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala:52: > error: value EMPTY in class RelMetadataQuery cannot be accessed in object > org.apache.calcite.rel.metadata.RelMetadataQuery > 10:57:51.770 [ERROR] Access to protected value EMPTY not permitted because > 10:57:51.770 [ERROR] enclosing package metadata in package plan is not a > subclass of > 10:57:51.770 [ERROR] class RelMetadataQuery in package metadata where target > is defined > 10:57:51.770 [ERROR] this(RelMetadataQuery.THREAD_PROVIDERS.get, > RelMetadataQuery.EMPTY) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] xuefuz commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#issuecomment-490709606 > @xuefuz @lirui-apache thanks for your review! The build failure is unrelated (python api issue). If there's no more comment, I will merge tonight I think I will go thru one more time on this. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#issuecomment-490695098 Rebased to latest master to resolve conflicts. Will wait for a green build before proceeding This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12417) Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface
[ https://issues.apache.org/jira/browse/FLINK-12417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-12417. Resolution: Fixed merged in 1.9.0: c954780c9fb2c0cc5b3730caff7e99bacbd534ba > Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog > interface > - > > Key: FLINK-12417 > URL: https://issues.apache.org/jira/browse/FLINK-12417 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > As discussed with [~dawidwys], the original purpose to separate > ReadableCatalog and ReadableWritableCatalog is to isolate access to metadata. > However, we believe access control and authorization is orthogonal to design > of catalog APIs and should be of a different effort. > Thus, we propose to merge ReadableCatalog and ReadableWritableCatalog to > simplify the design. > cc [~twalthr] [~xuefuz] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface
asfgit closed pull request #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface URL: https://github.com/apache/flink/pull/8365 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#issuecomment-490692748 @xuefuz @lirui-apache thanks for your review! The build failure is unrelated (python api issue). If there's no more comment, I will merge tonight This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface
bowenli86 commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface URL: https://github.com/apache/flink/pull/8365#issuecomment-490692552 @xuefuz thanks for your review! ReadableCatalog.java is already deleted. Merging This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface
xuefuz commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface URL: https://github.com/apache/flink/pull/8365#issuecomment-490680574 Yes. Let's delete ReadableCatalog.java and the PR is good to go. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#issuecomment-490680873 ok. sounds good. thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 edited a comment on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 edited a comment on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#issuecomment-490654662 > Thanks for refactoring the code. It looks much cleaner now. > > However, I still noticed there are still a lot of commonality between the two implementations of createHiveTable() and createCatalogTable(). Maybe we can share more code between them and reduce the code duplication. @xuefuz they currently look similar mainly because HiveCatalog's implementation is only preliminary at the moment as mentioned in comment. E.g. it hasn't consider input/output format and serdelib yet. As we enrich HiveCatalog, I expect they will diverge a lot. It would be better to try to unify their common parts in the end. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#issuecomment-490654662 > Thanks for refactoring the code. It looks much cleaner now. > > However, I still noticed there are still a lot of commonality between the two implementations of createHiveTable() and createCatalogTable(). Maybe we can share more code between them and reduce the code duplication. @xuefuz they currently look similar mainly because HiveCatalog's implementation is only preliminary at the moment as mentioned in comment. As we enrich HiveCatalog, I expect they will diverge a lot. It would be better to try to unify their common parts in the end. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282251157 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable); + } + } catch (TException e) { + throw new CatalogException( +
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282251084 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { Review comment: In memory catalog also doesn't check that currently. We should solve that in "FLINK-12452: alterTable() in all catalogs should ensure existing base table and the new one are of the same type" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-12452) alterTable() should ensure existing base table and the new one are of the same type
Bowen Li created FLINK-12452: Summary: alterTable() should ensure existing base table and the new one are of the same type Key: FLINK-12452 URL: https://issues.apache.org/jira/browse/FLINK-12452 Project: Flink Issue Type: Sub-task Components: Connectors / Hive, Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Currently all catalogs does check if existing base table and the new one are of the same type in alterTable(). We should add the check to all catalogs -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282244476 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + public abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) + throws IllegalArgumentException; + + /** +* Create a CatalogBaseTable from a Hive table. +* +* @param hiveTable a Hive table +* @return a CatalogBaseTable +*/ + public abstract CatalogBaseTable createCatalogTable(Table hiveTable); + + /** +* Create a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table a CatalogBaseTable +* @return a Hive table +*/ + public abstract Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table); Review comment: I feel "create" is better because it has more complexity than just simply convert one object to another and keep naming convention consistent. Also feels a bit weird to "convert" two things into one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282239896 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -363,4 +333,23 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath throw new UnsupportedOperationException(); } + // -- utils -- + + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private Map retrieveFlinkProperties(Map hiveTableParams) { + return hiveTableParams.entrySet().stream() + .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX)) + .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue())); + } + + /** +* Add a prefix to Flink-created properties to distinguish them from Hive-created properties. +*/ + private Map buildFlinkProperties(Map properties) { Review comment: I feel "mask" is a good name. Not sure about "unmask" because the retrieval not only unmasks flink properties but also filters out hive properties. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282238893 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno // -- tables and views-- @Override - public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - try { - client.dropTable( - tablePath.getDatabaseName(), - tablePath.getObjectName(), - // Indicate whether associated data should be deleted. - // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary - true, - ignoreIfNotExists); - } catch (NoSuchObjectException e) { - if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException( - String.format("Failed to drop table %s", tablePath.getFullName()), e); + public void validateCatalogBaseTable(CatalogBaseTable table) throws IllegalArgumentException { + // TODO: invalidate HiveCatalogView + if (table instanceof HiveCatalogTable) { + throw new IllegalArgumentException( + "Please use HiveCatalog to operate on HiveCatalogTable and HiveCatalogView."); } } @Override - public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, CatalogException { - try { - // alter_table() doesn't throw a clear exception when target table doesn't exist. Thus, check the table existence explicitly - if (tableExists(tablePath)) { - ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); - // alter_table() doesn't throw a clear exception when new table already exists. Thus, check the table existence explicitly - if (tableExists(newPath)) { - throw new TableAlreadyExistException(catalogName, newPath); - } else { - Table table = getHiveTable(tablePath); - table.setTableName(newTableName); - client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); - } - } else if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException( - String.format("Failed to rename table %s", tablePath.getFullName()), e); - } - } + public CatalogBaseTable createCatalogTable(Table hiveTable) { + // Table schema + TableSchema tableSchema = createTableSchema( + hiveTable.getSd().getCols(), hiveTable.getPartitionKeys()); - @Override - public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) - throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - if (!databaseExists(tablePath.getDatabaseName())) { - throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); - } else { - try { - client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, table)); - } catch (AlreadyExistsException e) { - if (!ignoreIfExists) { - throw new TableAlreadyExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); - } + // Table properties + Map
[GitHub] [flink] bowenli86 commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface
bowenli86 commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface URL: https://github.com/apache/flink/pull/8365#issuecomment-490639242 > Changes look good to me. However, I wonder if we should get ride of ReadableCatalog class. Thanks for your review. I think in the future there might be a ReadOnlyCatalog or similar interfaces. We can create that when necessary, and I don't think we need it right now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates
StefanRRichter commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates URL: https://github.com/apache/flink/pull/8361#discussion_r282212329 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/SetArrayDeque.java ## @@ -16,20 +16,38 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.partition.consumer; +package org.apache.flink.runtime.util; + +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Set; /** - * Listener interface implemented by consumers of {@link InputGate} instances - * that want to be notified of availability of buffer or event instances. + * Wrapper of {@link ArrayDeque} with quick {@link HashSet}'s based {@link #contains(Object)} method. */ -public interface InputGateListener { +public class SetArrayDeque { + private final ArrayDeque deque = new ArrayDeque<>(); + private final Set set = new HashSet<>(); + + public boolean add(T element) { + return deque.add(element) || set.add(element); Review comment: It is not only about the return value, but even worse you can add duplicates to the dequeue because it is ignored that the value is potentially already in the set. So this is not set semantics anymore. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StefanRRichter commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates
StefanRRichter commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates URL: https://github.com/apache/flink/pull/8361#discussion_r282213460 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ## @@ -596,6 +600,12 @@ public void requestPartitions() throws IOException, InterruptedException { } } + private void resetIsAvailable() { + if (isAvailable.isDone()) { Review comment: Yes, the `||` version makes sense. The suggestion was just a shortcut version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282206462 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + if (!tableExists(tablePath)) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } else { + Table newTable = createHiveTable(tablePath, newCatalogTable); + + // client.alter_table() requires a valid location + // thus, if new table doesn't have that, it reuses location of the old table + if (!newTable.getSd().isSetLocation()) { + Table oldTable = getHiveTable(tablePath); + newTable.getSd().setLocation(oldTable.getSd().getLocation()); + } + + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), newTable); + } + } catch (TException e) { + throw new CatalogException( +
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282205173 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { Review comment: Do we need to validate newCatalogTable? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282203125 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + public abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) Review comment: Use "protected" if we don't expect external callers. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282205441 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database newHiveDatabase, boolean throw new CatalogException(String.format("Failed to alter database %s", name), e); } } + + // -- tables -- + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return createCatalogTable(getHiveTable(tablePath)); + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + validateCatalogBaseTable(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); + } else { + try { + client.createTable(createHiveTable(tablePath, table)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); + } + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + try { + // alter_table() doesn't throw a clear exception when target table doesn't exist. + // Thus, check the table existence explicitly + if (tableExists(tablePath)) { + ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); + // alter_table() doesn't throw a clear exception when new table already exists. + // Thus, check the table existence explicitly + if (tableExists(newPath)) { + throw new TableAlreadyExistException(catalogName, newPath); + } else { + Table table = getHiveTable(tablePath); + table.setTableName(newTableName); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } + } else if (!ignoreIfNotExists) { + throw new TableNotExistException(catalogName, tablePath); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to rename table %s", tablePath.getFullName()), e); + } + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { Review comment: Also, how do we ensure no alter for table to view and vise versa? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282203601 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + public abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) + throws IllegalArgumentException; + + /** +* Create a CatalogBaseTable from a Hive table. +* +* @param hiveTable a Hive table +* @return a CatalogBaseTable +*/ + public abstract CatalogBaseTable createCatalogTable(Table hiveTable); + + /** +* Create a Hive table from a CatalogBaseTable. +* +* @param tablePath path of the table +* @param table a CatalogBaseTable +* @return a Hive table +*/ + public abstract Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table); Review comment: Same as above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282203452 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -89,6 +101,34 @@ private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { } } + // -- APIs -- + + /** +* Validate input base table. +* +* @param catalogBaseTable the base table to be validated +* @throws IllegalArgumentException thrown if the input base table is invalid. +*/ + public abstract void validateCatalogBaseTable(CatalogBaseTable catalogBaseTable) + throws IllegalArgumentException; + + /** +* Create a CatalogBaseTable from a Hive table. +* +* @param hiveTable a Hive table +* @return a CatalogBaseTable +*/ + public abstract CatalogBaseTable createCatalogTable(Table hiveTable); Review comment: Can we use word "convertTo" instead of "create"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282201837 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -363,4 +333,23 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath throw new UnsupportedOperationException(); } + // -- utils -- + + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private Map retrieveFlinkProperties(Map hiveTableParams) { + return hiveTableParams.entrySet().stream() + .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX)) + .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue())); + } + + /** +* Add a prefix to Flink-created properties to distinguish them from Hive-created properties. +*/ + private Map buildFlinkProperties(Map properties) { Review comment: Maybe we can use words "mask" and "unmask" instead of "build" and "retrieve". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282199916 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno // -- tables and views-- @Override - public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - try { - client.dropTable( - tablePath.getDatabaseName(), - tablePath.getObjectName(), - // Indicate whether associated data should be deleted. - // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary - true, - ignoreIfNotExists); - } catch (NoSuchObjectException e) { - if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException( - String.format("Failed to drop table %s", tablePath.getFullName()), e); + public void validateCatalogBaseTable(CatalogBaseTable table) throws IllegalArgumentException { + // TODO: invalidate HiveCatalogView + if (table instanceof HiveCatalogTable) { + throw new IllegalArgumentException( + "Please use HiveCatalog to operate on HiveCatalogTable and HiveCatalogView."); } } @Override - public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException, CatalogException { - try { - // alter_table() doesn't throw a clear exception when target table doesn't exist. Thus, check the table existence explicitly - if (tableExists(tablePath)) { - ObjectPath newPath = new ObjectPath(tablePath.getDatabaseName(), newTableName); - // alter_table() doesn't throw a clear exception when new table already exists. Thus, check the table existence explicitly - if (tableExists(newPath)) { - throw new TableAlreadyExistException(catalogName, newPath); - } else { - Table table = getHiveTable(tablePath); - table.setTableName(newTableName); - client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); - } - } else if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException( - String.format("Failed to rename table %s", tablePath.getFullName()), e); - } - } + public CatalogBaseTable createCatalogTable(Table hiveTable) { + // Table schema + TableSchema tableSchema = createTableSchema( + hiveTable.getSd().getCols(), hiveTable.getPartitionKeys()); - @Override - public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) - throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - if (!databaseExists(tablePath.getDatabaseName())) { - throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName()); - } else { - try { - client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, table)); - } catch (AlreadyExistsException e) { - if (!ignoreIfExists) { - throw new TableAlreadyExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e); - } + // Table properties + Map properties =
[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282198787 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno // -- tables and views-- @Override - public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) - throws TableNotExistException, CatalogException { - try { - client.dropTable( - tablePath.getDatabaseName(), - tablePath.getObjectName(), - // Indicate whether associated data should be deleted. - // Set to 'true' for now because Flink tables shouldn't have data in Hive. Can be changed later if necessary - true, - ignoreIfNotExists); - } catch (NoSuchObjectException e) { - if (!ignoreIfNotExists) { - throw new TableNotExistException(catalogName, tablePath); - } - } catch (TException e) { - throw new CatalogException( - String.format("Failed to drop table %s", tablePath.getFullName()), e); + public void validateCatalogBaseTable(CatalogBaseTable table) throws IllegalArgumentException { + // TODO: invalidate HiveCatalogView + if (table instanceof HiveCatalogTable) { Review comment: I'm wondering if we should specify what we are expecting and throw exceptions if condition isn't met. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes
[ https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835800#comment-16835800 ] Henrik edited comment on FLINK-12381 at 5/8/19 6:34 PM: Yes, you can see it like that (a new cluster), I suppose. So does that mean that flink is useless without HA then? Because if I don't have HA, and the node I'm running it on, or the k8s pod I'm running it in, restarts, it's a new cluster? In the optimal world, I would not have to manually change the specification of the job that runs, without the job that runs also having been changed. I.e. it goes against declarative running of resources in a k8s cluster to manually have to change the jobid whenever the pod is restarted. was (Author: haf): Yes, you can see it like that (a new cluster), I suppose. So does that mean that flink is useless without HA then? Because if I don't have HA, and the node I'm running it on, or the k8s pod I'm running it in, restarts, it's a new cluster? > W/o HA, upon a full restart, checkpointing crashes > -- > > Key: FLINK-12381 > URL: https://issues.apache.org/jira/browse/FLINK-12381 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.8.0 > Environment: Same as FLINK-\{12379, 12377, 12376} >Reporter: Henrik >Priority: Major > > {code:java} > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > 'gs://example_bucket/flink/checkpoints//chk-16/_metadata' > already exists > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) > ... 8 more > {code} > Instead, it should either just overwrite the checkpoint or fail to start the > job completely. Partial and undefined failure is not what should happen. > > Repro: > # Set up a single purpose job cluster (which could use much better docs btw!) > # Let it run with GCS checkpointing for a while with rocksdb/gs://example > # Kill it > # Start it -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-12379) Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint
[ https://issues.apache.org/jira/browse/FLINK-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835505#comment-16835505 ] Henrik edited comment on FLINK-12379 at 5/8/19 6:29 PM: Yes, it's the "standalone-job.sh" entrypoint. [~StephanEwen] AFAIK it's the recommended way to deploy a standalone job? was (Author: haf): Yes, it's the "standalone-job.sh" entrypoint. [~StephanEwen] > Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint > > > Key: FLINK-12379 > URL: https://issues.apache.org/jira/browse/FLINK-12379 > Project: Flink > Issue Type: Bug > Components: FileSystems, Runtime / Coordination >Affects Versions: 1.8.0 > Environment: GCS + > > {code:java} > 1.8.0 > 1.8 > 2.11{code} > {code:java} > > > > > com.google.cloud.bigdataoss > gcs-connector > hadoop2-1.9.16 > > > org.apache.flink > flink-connector-filesystem_2.11 > ${flink.version} > > > org.apache.flink > flink-hadoop-fs > ${flink.version} > > > > org.apache.flink > flink-shaded-hadoop2 > ${hadoop.version}-${flink.version} > > {code} > > >Reporter: Henrik >Priority: Major > > When running one standalone-job w/ parallelism=1 + one taskmanager, you will > shortly get this crash > {code:java} > 2019-04-30 22:20:02,928 WARN org.apache.flink.runtime.jobmaster.JobMaster > - Error while processing checkpoint acknowledgement message > org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize > the pending checkpoint 5. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > 'gs://example_bucket/flink/checkpoints//chk-5/_metadata' > already exists > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) > ... 8 more > Caused by: java.nio.file.FileAlreadyExistsException: Object > gs://example_bucket/flink/checkpoints//chk-5/_metadata > already exists. > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1918) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:410) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:286) > at > com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:264) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:82) > ... 19 more > 2019-04-30 22:20:03,114 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 6 @
[jira] [Commented] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes
[ https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835800#comment-16835800 ] Henrik commented on FLINK-12381: Yes, you can see it like that (a new cluster), I suppose. So does that mean that flink is useless without HA then? Because if I don't have HA, and the node I'm running it on, or the k8s pod I'm running it in, restarts, it's a new cluster? > W/o HA, upon a full restart, checkpointing crashes > -- > > Key: FLINK-12381 > URL: https://issues.apache.org/jira/browse/FLINK-12381 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Coordination >Affects Versions: 1.8.0 > Environment: Same as FLINK-\{12379, 12377, 12376} >Reporter: Henrik >Priority: Major > > {code:java} > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: > 'gs://example_bucket/flink/checkpoints//chk-16/_metadata' > already exists > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829) > ... 8 more > {code} > Instead, it should either just overwrite the checkpoint or fail to start the > job completely. Partial and undefined failure is not what should happen. > > Repro: > # Set up a single purpose job cluster (which could use much better docs btw!) > # Let it run with GCS checkpointing for a while with rocksdb/gs://example > # Kill it > # Start it -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] kisimple commented on issue #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor
kisimple commented on issue #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor URL: https://github.com/apache/flink/pull/7757#issuecomment-490593540 Hi @azagrebin Sorry for the delay, I have addressed your comments, and I haven't reproduced the original issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#discussion_r282181650 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java ## @@ -42,23 +46,35 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.UnknownDBException; -import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * A catalog that persists all Flink streaming and batch metadata by using Hive metastore as a persistent storage. */ public class GenericHiveMetastoreCatalog extends HiveCatalogBase { private static final Logger LOG = LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + + // Flink tables should be stored as 'external' tables in Hive metastore Review comment: You are right, I verified it locally. I think using Hive managed tables would be fine here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog
bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8353#issuecomment-490590162 @xuefuz @lirui-apache thanks for your review. please take another look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese
XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese URL: https://github.com/apache/flink/pull/8300#issuecomment-490584204 hi @klion26 Thank you for your review. I have modified it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures
[ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835749#comment-16835749 ] Chris Slotterback commented on FLINK-10455: --- The way I was able to reproduce this was forcing the job into a failed state by timing out writing a checkpoint on the filesystem. We occasionally see latency spikes in our env resulting in these job restarts. Most of the jobs are able to recover fine, but when using exactly once for the kafka producer the job gets stuck in this loop. My assumption is any job failure will reach the client processDisconnect method after the class loader is gone. > Potential Kafka producer leak in case of failures > - > > Key: FLINK-10455 > URL: https://issues.apache.org/jira/browse/FLINK-10455 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.5.2 >Reporter: Nico Kruber >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1 > > > If the Kafka brokers' timeout is too low for our checkpoint interval [1], we > may get an {{ProducerFencedException}}. Documentation around > {{ProducerFencedException}} explicitly states that we should close the > producer after encountering it. > By looking at the code, it doesn't seem like this is actually done in > {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in > {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an > exception, we don't clean up (nor try to commit) any other transaction. > -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} > simply iterates over the {{pendingCommitTransactions}} which is not touched > during {{close()}} > Now if we restart the failing job on the same Flink cluster, any resources > from the previous attempt will still linger around. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12448) FlinkKafkaProducer late closure after class loader
[ https://issues.apache.org/jira/browse/FLINK-12448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Slotterback closed FLINK-12448. - Resolution: Duplicate > FlinkKafkaProducer late closure after class loader > -- > > Key: FLINK-12448 > URL: https://issues.apache.org/jira/browse/FLINK-12448 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.2 >Reporter: Chris Slotterback >Priority: Major > > During job failure/restart, FlinkKafkaProducer configured with > Semantic.EXACTLY_ONCE fails to disconnect properly do to a > NoClassDefFoundError: > {noformat} > java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1 > at > org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:658) > at > org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:805) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:520) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:226) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) > at java.lang.Thread.run(Thread.java:748){noformat} > > This begins a restart loop where the job never recovers properly. This is > reproducible only with EXACTLY_ONCE semantic, AT_LEAST_ONCE properly > disconnects and restarts without error. > This issue is described in FLINK-10455, but has been since marked as Fixed, > but still reproducible. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12451) [Bitwise Functions] Add BIT_XOR function supported in Table API and SQL
Zhanchun Zhang created FLINK-12451: -- Summary: [Bitwise Functions] Add BIT_XOR function supported in Table API and SQL Key: FLINK-12451 URL: https://issues.apache.org/jira/browse/FLINK-12451 Project: Flink Issue Type: Sub-task Reporter: Zhanchun Zhang Assignee: Zhanchun Zhang Bitwise XOR, returns an unsigned 64-bit integer. eg: SELECT 1 ^ 1; returns 0 SELECT 1 ^ 0; returns 1 SELECT 11 ^ 3; returns 8 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL
Zhanchun Zhang created FLINK-12450: -- Summary: [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL Key: FLINK-12450 URL: https://issues.apache.org/jira/browse/FLINK-12450 Project: Flink Issue Type: Sub-task Reporter: Zhanchun Zhang Assignee: Zhanchun Zhang BIT_LSHIFT, Shifts a long number to the left BIT_RSHIFT, Shifts a long number to the right -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12449) [Bitwise Functions] Add BIT_AND, BIT_OR functions supported in Table API and SQL
Zhanchun Zhang created FLINK-12449: -- Summary: [Bitwise Functions] Add BIT_AND, BIT_OR functions supported in Table API and SQL Key: FLINK-12449 URL: https://issues.apache.org/jira/browse/FLINK-12449 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Zhanchun Zhang Assignee: Zhanchun Zhang Bitwise AND. eg. SELECT BIT_AND(29,15), returns 13 Bitwise OR eg. SELECT BIT_OR(29 ,15), returns 31 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure URL: https://github.com/apache/flink/pull/8242#issuecomment-490529494 Thanks for the further review @tillrohrmann . Let me try to explain above two concerns. - Whether to always transform `IOException` to `PartitionNotFoundException`? Actually I would like to do this wrapper in the construtor of `SpilledSubpartitionView` which might cause `IOException` during opening disk file. And based on the current codes, this is the only process which might cause `IOExeption` during creating `ResultSubpartitionView`. But considering the `SpillableSubpartitoin` and its reader view would be removed by stephan's new `BoundedBlockingSubpartition` soon, so I do this wrapper as the way in PR which might also suitable for the new `BoundedBlockingSubpartition`. But in strict way it would be better to transform in the specific process instead of covering the whole process. If you have concerns on this, I could adjust it to the internal process even though it would be removed soon. Or we do not focus on the case `c` until `BoundedBlockingSubpartition` is merged. - Whether to check file exist on producer side and throw `PartitionNotFoundException` there? I think the check could only be done on producer side during opening the file. Maybe there are other options but I have not thought of now. But the producer might not throw `PartitionNotFoundException` after checking file nonexist. Another option is the producer throws another special exception called `PartitionOpenException`, then it would be sent to consumer via `ErrorResponse` in network. The consumer would fail directly and wrap it into `PartitionNotFoundException` for JM. To do so we could avoid the process of checking partition state on consumer side when receiving `PartitionNotFoundException`. But we need another new exception defination. My current way in PR is easy and actually for blocking partition the consumer could also fail directly after receiving `PartitionNotFoundException` as I mentioned before. I could create a ticket jira for the connection issue later. The unit tests I have not focused yet, because it is related to the process we want to implement. After you approval the current way, I would fix the existing tests and add new tests to cover the cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10455) Potential Kafka producer leak in case of failures
[ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10455: -- Fix Version/s: (was: 1.6.3) 1.8.1 1.9.0 1.7.3 > Potential Kafka producer leak in case of failures > - > > Key: FLINK-10455 > URL: https://issues.apache.org/jira/browse/FLINK-10455 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.5.2 >Reporter: Nico Kruber >Assignee: Andrey Zagrebin >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1 > > > If the Kafka brokers' timeout is too low for our checkpoint interval [1], we > may get an {{ProducerFencedException}}. Documentation around > {{ProducerFencedException}} explicitly states that we should close the > producer after encountering it. > By looking at the code, it doesn't seem like this is actually done in > {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in > {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an > exception, we don't clean up (nor try to commit) any other transaction. > -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} > simply iterates over the {{pendingCommitTransactions}} which is not touched > during {{close()}} > Now if we restart the failing job on the same Flink cluster, any resources > from the previous attempt will still linger around. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] pnowojski commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates
pnowojski commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates URL: https://github.com/apache/flink/pull/8361#discussion_r28202 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/SetArrayDeque.java ## @@ -16,20 +16,38 @@ * limitations under the License. */ -package org.apache.flink.runtime.io.network.partition.consumer; +package org.apache.flink.runtime.util; + +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Set; /** - * Listener interface implemented by consumers of {@link InputGate} instances - * that want to be notified of availability of buffer or event instances. + * Wrapper of {@link ArrayDeque} with quick {@link HashSet}'s based {@link #contains(Object)} method. */ -public interface InputGateListener { +public class SetArrayDeque { + private final ArrayDeque deque = new ArrayDeque<>(); + private final Set set = new HashSet<>(); + + public boolean add(T element) { + return deque.add(element) || set.add(element); Review comment: I can change it to `void add(T)` ;) But ok, I will add the unit test for this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates
pnowojski commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates URL: https://github.com/apache/flink/pull/8361#discussion_r282110599 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java ## @@ -197,32 +197,49 @@ public void requestPartitions() throws IOException, InterruptedException { return Optional.of(bufferOrEvent); } - @Override - public Optional pollNextBufferOrEvent() throws UnsupportedOperationException { - throw new UnsupportedOperationException(); - } - - private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException { + private Optional waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException { while (true) { - InputGate inputGate; - boolean moreInputGatesAvailable; synchronized (inputGatesWithData) { while (inputGatesWithData.size() == 0) { - inputGatesWithData.wait(); + if (blocking) { + inputGatesWithData.wait(); + } else { + resetIsAvailable(); + return Optional.empty(); + } + } + final InputGate inputGate = inputGatesWithData.remove(); + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. + Optional bufferOrEvent = inputGate.pollNextBufferOrEvent(); + + if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) { + // enqueue the inputGate at the end to avoid starvation + inputGatesWithData.add(inputGate); + } else { + inputGate.isAvailable().thenRun(() -> queueInputGate(inputGate)); Review comment: Yes, but this is not an issue here, since if `inputGate.isAvailable() == AVAILABLE` it would mean that `bufferOrEvent.get().moreAvailable()` returns true - so when hot looping we do not enter this code path. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services