[jira] [Commented] (FLINK-12550) hostnames with a dot never receive local input splits
[ https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843673#comment-16843673 ] Quan Shi commented on FLINK-12550: -- How about replace host with FQDN in JobMaster? {code:java} // Code in method JobMaster.requestNextInputSplit: final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; //to final String host = slot != null ? slot.getTaskManagerLocation().getFQDNHostname() : null; {code} > hostnames with a dot never receive local input splits > - > > Key: FLINK-12550 > URL: https://issues.apache.org/jira/browse/FLINK-12550 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.8.0 >Reporter: Felix seibert >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > LocatableInputSplitAssigner (in package api.common.io) fails to assign local > input splits to hosts whose hostname contains a dot ("."). To reproduce add > the following test to LocatableSplitAssignerTest and execute it. It will > always fail. In my mind, this is contrary to the expected behaviour, which is > that the host should obtain the one split that is stored on the very same > machine. > > {code:java} > @Test > public void testLocalSplitAssignmentForHostWithDomainName() { >try { > String hostNameWithDot = "testhost.testdomain"; > // load one split > Set splits = new HashSet(); > splits.add(new LocatableInputSplit(0, hostNameWithDot)); > // get next split for the host > LocatableInputSplitAssigner ia = new > LocatableInputSplitAssigner(splits); > InputSplit is = null; > ia.getNextInputSplit(hostNameWithDot, 0); > // there should be exactly zero remote and one local assignment > assertEquals(0, ia.getNumberOfRemoteAssignments()); > assertEquals(1, ia.getNumberOfLocalAssignments()); >} >catch (Exception e) { > e.printStackTrace(); > fail(e.getMessage()); >} > } > {code} > I also experienced this error in practice, and will later today open a pull > request to fix it. > > Note: I'm not sure if I selected the correct component category. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#discussion_r285428078 ## File path: flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java ## @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 1)); env.enableCheckpointing(5000); // create a checkpoint every 5 seconds + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); Review comment: Have added a new test case to verify the exception behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog
xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8449#discussion_r285426012 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -407,4 +443,201 @@ private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException { String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e); } } + + // -- partitions -- + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + try { + Table hiveTable = getHiveTable(tablePath); + return client.getPartition(tablePath.getDatabaseName(), tablePath.getObjectName(), + getFullPartitionValues(tablePath, partitionSpec, getFieldNames(hiveTable.getPartitionKeys( != null; + } catch (NoSuchObjectException | TableNotExistException | PartitionSpecInvalidException e) { + return false; + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e); + } + } + + @Override + public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { + + validateCatalogPartition(partition); + + Table hiveTable = getHiveTable(tablePath); + + ensurePartitionedTable(tablePath, hiveTable); + + try { + client.add_partition(createHivePartition(hiveTable, partitionSpec, partition)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to create partition %s of table %s", partitionSpec, tablePath)); + } + } + + @Override + public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + try { + Table hiveTable = getHiveTable(tablePath); + ensurePartitionedTable(tablePath, hiveTable); + client.dropPartition(tablePath.getDatabaseName(), tablePath.getObjectName(), + getFullPartitionValues(tablePath, partitionSpec, getFieldNames(hiveTable.getPartitionKeys())), true); + } catch (NoSuchObjectException e) { + if (!ignoreIfNotExists) { + throw new PartitionNotExistException(catalogName, tablePath, partitionSpec); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to drop partition %s of table %s", partitionSpec, tablePath)); + } catch (TableNotPartitionedException | TableNotExistException | PartitionSpecInvalidException e) { + throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e); + } + } + + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + Table hiveTable = getHiveTable(tablePath); + + ensurePartitionedTable(tablePath, hiveTable); + + try { + return client.listPartitionNames(tablePath.getDatabaseName(), tablePath.getObjectName(), (short) -1).stream() + .map(HiveCatalogBase::createPartitionSpec).collect(Collectors.toList()); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to list partitions of table %s", tablePath), e); + } + } + + @Override + public List listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + Table hiveTable =
[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#issuecomment-493837160 Triggered again, still has SQL deadlock : https://travis-ci.org/apache/flink/jobs/534620910 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog
xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8449#discussion_r285427319 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -407,4 +443,201 @@ private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException { String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e); } } + + // -- partitions -- + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + try { + Table hiveTable = getHiveTable(tablePath); + return client.getPartition(tablePath.getDatabaseName(), tablePath.getObjectName(), + getFullPartitionValues(tablePath, partitionSpec, getFieldNames(hiveTable.getPartitionKeys( != null; + } catch (NoSuchObjectException | TableNotExistException | PartitionSpecInvalidException e) { + return false; + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e); + } + } + + @Override + public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { + + validateCatalogPartition(partition); + + Table hiveTable = getHiveTable(tablePath); + + ensurePartitionedTable(tablePath, hiveTable); + + try { + client.add_partition(createHivePartition(hiveTable, partitionSpec, partition)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to create partition %s of table %s", partitionSpec, tablePath)); + } + } + + @Override + public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + try { + Table hiveTable = getHiveTable(tablePath); + ensurePartitionedTable(tablePath, hiveTable); + client.dropPartition(tablePath.getDatabaseName(), tablePath.getObjectName(), Review comment: getHiveTable() requires a round trip. Compared to precise exception msg, I'd say we minimize the number of trips. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leesf commented on issue #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase
leesf commented on issue #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase URL: https://github.com/apache/flink/pull/8226#issuecomment-493836535 @azagrebin Thanks for opening another PR, I will close this one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] leesf closed pull request #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase
leesf closed pull request #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase URL: https://github.com/apache/flink/pull/8226 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog
xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8449#discussion_r285427041 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -407,4 +443,201 @@ private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException { String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e); } } + + // -- partitions -- + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + try { + Table hiveTable = getHiveTable(tablePath); + return client.getPartition(tablePath.getDatabaseName(), tablePath.getObjectName(), + getFullPartitionValues(tablePath, partitionSpec, getFieldNames(hiveTable.getPartitionKeys( != null; + } catch (NoSuchObjectException | TableNotExistException | PartitionSpecInvalidException e) { + return false; + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e); + } + } + + @Override + public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { + + validateCatalogPartition(partition); + + Table hiveTable = getHiveTable(tablePath); + + ensurePartitionedTable(tablePath, hiveTable); + + try { + client.add_partition(createHivePartition(hiveTable, partitionSpec, partition)); Review comment: However, additional trips to HMS should be avoided if we can. I guess getHiveTable() requires round trip. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog
xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8449#discussion_r285426012 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java ## @@ -407,4 +443,201 @@ private Table getHiveTable(ObjectPath tablePath) throws TableNotExistException { String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e); } } + + // -- partitions -- + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + try { + Table hiveTable = getHiveTable(tablePath); + return client.getPartition(tablePath.getDatabaseName(), tablePath.getObjectName(), + getFullPartitionValues(tablePath, partitionSpec, getFieldNames(hiveTable.getPartitionKeys( != null; + } catch (NoSuchObjectException | TableNotExistException | PartitionSpecInvalidException e) { + return false; + } catch (TException e) { + throw new CatalogException( + String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e); + } + } + + @Override + public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { + + validateCatalogPartition(partition); + + Table hiveTable = getHiveTable(tablePath); + + ensurePartitionedTable(tablePath, hiveTable); + + try { + client.add_partition(createHivePartition(hiveTable, partitionSpec, partition)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to create partition %s of table %s", partitionSpec, tablePath)); + } + } + + @Override + public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + try { + Table hiveTable = getHiveTable(tablePath); + ensurePartitionedTable(tablePath, hiveTable); + client.dropPartition(tablePath.getDatabaseName(), tablePath.getObjectName(), + getFullPartitionValues(tablePath, partitionSpec, getFieldNames(hiveTable.getPartitionKeys())), true); + } catch (NoSuchObjectException e) { + if (!ignoreIfNotExists) { + throw new PartitionNotExistException(catalogName, tablePath, partitionSpec); + } + } catch (TException e) { + throw new CatalogException( + String.format("Failed to drop partition %s of table %s", partitionSpec, tablePath)); + } catch (TableNotPartitionedException | TableNotExistException | PartitionSpecInvalidException e) { + throw new PartitionNotExistException(catalogName, tablePath, partitionSpec, e); + } + } + + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + Table hiveTable = getHiveTable(tablePath); + + ensurePartitionedTable(tablePath, hiveTable); + + try { + return client.listPartitionNames(tablePath.getDatabaseName(), tablePath.getObjectName(), (short) -1).stream() + .map(HiveCatalogBase::createPartitionSpec).collect(Collectors.toList()); + } catch (TException e) { + throw new CatalogException( + String.format("Failed to list partitions of table %s", tablePath), e); + } + } + + @Override + public List listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + Table hiveTable =
[GitHub] [flink] xuefuz commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#issuecomment-493834540 PR is updated based on review feedback. Re. the new files shouldn't be part of the first commit but I forgot to include them. However, they cannot be in a separate JIRA as the first commit needed it and it cannot stand on its own. When we merge the PR, we can combine all commits into the first one, right? I'm not sure what issue it's. However, I can recommit a PR and do a forced push. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285425254 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; Review comment: Updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on issue #8397: [FLINK-11421][Table SQL/Runtime]Add compilation options to allow comp…
JingsongLi commented on issue #8397: [FLINK-11421][Table SQL/Runtime]Add compilation options to allow comp… URL: https://github.com/apache/flink/pull/8397#issuecomment-493833300 @liyafan82 Thanks your local benchmark. I think it is important to try to reproduce the results in a local machine. As you say, cluster environment is very complicate. In that case, the test data may not tell where optimizations exist and where regression occurs. The compiler optimizer is easy to reproduce on a single machine. Everything is mockable. Can you add some other patterns? Like vector computation benchmark? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285423726 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -48,6 +48,9 @@ * A generic catalog implementation that holds all meta objects in memory. */ public class GenericInMemoryCatalog implements Catalog { + public static final String FLINK_META_PROPERTY_KEY = "is_generic"; Review comment: This is key is added automatically as part of metaobject instantiation. It's subject to mask/retrieve operations. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285423478 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[jira] [Commented] (FLINK-12348) Make TableConfig configurable from string-string map
[ https://issues.apache.org/jira/browse/FLINK-12348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843630#comment-16843630 ] Jing Zhang commented on FLINK-12348: [~twalthr], do you prefer that PlannerConfig provides methods for setting planner-related string-string pairs, however the underlying map stores in the member of TableConfig? And [~jark]'s concern also make sense, how about the following solution? We provide two options for users to get/set planner configuration: 1. Directly read/write a member `org.apache.flink.configuration.Configuration` in TableConfig based on a certain key. In the way, there is no difference to set a planner-related configuration or API-related configuration. However, users need know the exact key string. 2. Calls methods in PlannerConfig to read/write the configuration. In the way, users must know that API-related configurations need to be configured via TableConfig and planner-related configurations need to be configured via PlannerConfig. However, users does not need to care the exact key string. > Make TableConfig configurable from string-string map > > > Key: FLINK-12348 > URL: https://issues.apache.org/jira/browse/FLINK-12348 > Project: Flink > Issue Type: Task > Components: Table SQL / API >Reporter: Jing Zhang >Assignee: Jing Zhang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Since TableConfig already moved to API module in > [FLINK-11067|https://issues.apache.org/jira/browse/FLINK-11067], TableConfig > in blink-planner-module should not exist anymore. The issue aims to remove > the TableConfig in blink-planner-module, use TableConfig in API module > instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11560) Translate "Flink Applications" page into Chinese
[ https://issues.apache.org/jira/browse/FLINK-11560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843623#comment-16843623 ] Zhou Yumin commented on FLINK-11560: Since the old PR [https://github.com/apache/flink-web/pull/182] for it has not been updated for months and the issue is still unassigned by the time I started. I decided to take the issue. Is that fine for you, [~jark] & [~stephenye]? My new PR [https://github.com/apache/flink-web/pull/215] is created for it. This is one of the last opening subtasks of the website translation. Hope for a quick review from your burdened work and we can make FLINK-11526 all done, thanks. > Translate "Flink Applications" page into Chinese > > > Key: FLINK-11560 > URL: https://issues.apache.org/jira/browse/FLINK-11560 > Project: Flink > Issue Type: Sub-task > Components: chinese-translation, Project Website >Reporter: Jark Wu >Assignee: Zhou Yumin >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Translate "Flink Applications" page into Chinese. > The markdown file is located in: flink-web/flink-applications.zh.md > The url link is: https://flink.apache.org/zh/flink-applications.html > Please adjust the links in the page to Chinese pages when translating. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] lirui-apache commented on issue #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog
lirui-apache commented on issue #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8449#issuecomment-493827022 @bowenli86 @xuefuz thanks for your comments. Please take a look at the updated PR. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#issuecomment-493826613 @StefanRRichter After thinking seriously, my view of point has changed. There are two options: * to keep all test cases and user jobs' default behavior, the `tolerableCheckpointFailureNumber`'s default value seems should be `Integer.MAX_VALUE`; * to keep the compatibility, the `tolerableCheckpointFailureNumber`'s default value seems should be 0; The key problem is the two options ' **range of action** is different. The `failOnCheckpointingErrors` option just cover the task's checkpointing error in the TaskManager end (even not the whole execution phase). While the `tolerableCheckpointFailureNumber` option needs to cover the whole trigger and execution phases. The thought to support option1: In many scenes, if we set the `tolerableCheckpointFailureNumber`'s default value to 0. The behavior of the users' job would be changed. It would cause more frequency to fail and restart. For example, the task is not ready to do checkpoint so it sends a decline message to trigger failure manager to fail and restart the job. So it changed the test cases and user jobs' default behavior. This is the reason why I change the default value to `Integer.MAX_VALUE`, although they are sporadic. The thought to support option2: If we set the `tolerableCheckpointFailureNumber`'s default value to `Integer.MAX_VALUE`, it may introduce a **compatibility issue** how to handle the `failOnCheckpointingErrors` config option in the future? The original thought is that the failOnCheckpointingErrors(two values : 0 and Integer.MAX_VALUE) is a subset of the tolerableCheckpointFailureNumber(0 to Integer.MAX_VALUE). We had wanted to deprecate the `failOnCheckpointingErrors` option in the third step. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285416058 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -192,46 +590,62 @@ protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { "HiveCatalog only supports HiveCatalogTable and HiveCatalogView"); } - hiveTable.setSd(sd); - return hiveTable; } + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private static Map retrieveFlinkProperties(Map hiveTableParams) { + return hiveTableParams.entrySet().stream() + .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX)) + .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue())); + } + + /** +* Add a prefix to Flink-created properties to distinguish them from Hive-created properties. +*/ + private static Map maskFlinkProperties(Map properties) { + return properties.entrySet().stream() + .filter(e -> e.getKey() != null && e.getValue() != null) + .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue())); + } + // -- partitions -- @Override public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) - throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { Review comment: ok This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285415666 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285415326 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285415235 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. Review comment: Updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285415211 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285415172 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285414379 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] yanghua edited a comment on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua edited a comment on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#issuecomment-493817814 After fixing state evolution, Travis's result is still a failure. The reason is deadlock which comes from sql test detail: https://travis-ci.org/apache/flink/jobs/534067083 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on issue #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
Aitozi commented on issue #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#issuecomment-493818308 The travis-ci failure seems unrelated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] lirui-apache commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog
lirui-apache commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog URL: https://github.com/apache/flink/pull/8449#discussion_r285413443 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -200,44 +200,56 @@ protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { // -- partitions -- @Override - public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) - throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) - throws PartitionNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) - throws PartitionNotExistException, CatalogException { - throw new UnsupportedOperationException(); - } - - @Override - public List listPartitions(ObjectPath tablePath) - throws TableNotExistException, TableNotPartitionedException, CatalogException { - throw new UnsupportedOperationException(); + protected void validateCatalogPartition(CatalogPartition catalogPartition) throws CatalogException { + if (!(catalogPartition instanceof HiveCatalogPartition)) { + throw new CatalogException(String.format("%s can only handle %s but got %s", getClass().getSimpleName(), + HiveCatalogPartition.class.getSimpleName(), catalogPartition.getClass().getSimpleName())); + } } @Override - public List listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) - throws TableNotExistException, TableNotPartitionedException, CatalogException { - throw new UnsupportedOperationException(); - } + protected Partition createHivePartition(Table hiveTable, CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition) + throws PartitionSpecInvalidException { + Partition partition = new Partition(); + List partCols = getFieldNames(hiveTable.getPartitionKeys()); + List partValues = getFullPartitionValues(new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()), + partitionSpec, partCols); + // validate partition values + for (int i = 0; i < partCols.size(); i++) { + if (StringUtils.isNullOrWhitespaceOnly(partValues.get(i))) { + throw new PartitionSpecInvalidException(catalogName, partCols, + new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()), partitionSpec); + } + } + HiveCatalogPartition hiveCatalogPartition = (HiveCatalogPartition) catalogPartition; + partition.setValues(partValues); + partition.setDbName(hiveTable.getDbName()); + partition.setTableName(hiveTable.getTableName()); + partition.setCreateTime((int) (System.currentTimeMillis() / 1000)); + partition.setParameters(hiveCatalogPartition.getProperties()); + partition.setSd(hiveTable.getSd().deepCopy()); + + String location = hiveCatalogPartition.getLocation(); + if (null == location) { Review comment: Makes sense. I'll have a try This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure
yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure URL: https://github.com/apache/flink/pull/8322#issuecomment-493817814 retriggered once, result failure, reason: dead lock (sql) detail : https://travis-ci.org/apache/flink/jobs/534067083 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8481: Release 1.7
flinkbot commented on issue #8481: Release 1.7 URL: https://github.com/apache/flink/pull/8481#issuecomment-493816547 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zehant opened a new pull request #8481: Release 1.7
zehant opened a new pull request #8481: Release 1.7 URL: https://github.com/apache/flink/pull/8481 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] beyond1920 commented on a change in pull request #8462: [FLINK-12496][table-planner-blink] Support translation from StreamExecGroupWindowAggregate to StreamTransformation.
beyond1920 commented on a change in pull request #8462: [FLINK-12496][table-planner-blink] Support translation from StreamExecGroupWindowAggregate to StreamTransformation. URL: https://github.com/apache/flink/pull/8462#discussion_r285412015 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala ## @@ -695,15 +699,43 @@ object AggregateUtil extends Enumeration { (propPos._1, propPos._2, propPos._3) } - def isRowtimeIndicatorType(fieldType: TypeInformation[_]): Boolean = { -TimeIndicatorTypeInfo.ROWTIME_INDICATOR == fieldType + def isRowtimeIndicatorType(fieldType: TypeInformation[_]): Boolean = fieldType match { Review comment: FlinkTypeFactory has already referenced InternalType, RelDataType, and provide convert conversion functions between these two types. If we move `isRowtimeIndicatorType`,` isProctimeIndicatorType`, `isTimeIntervalType` and `isRowIntervalType` to FlinkTypeFactory, then FlinkTypeFactory will reference three types, which is not good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-12088) Introduce unbounded streaming inner join operator
[ https://issues.apache.org/jira/browse/FLINK-12088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young reassigned FLINK-12088: -- Assignee: Jark Wu (was: Kurt Young) > Introduce unbounded streaming inner join operator > - > > Key: FLINK-12088 > URL: https://issues.apache.org/jira/browse/FLINK-12088 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Kurt Young >Assignee: Jark Wu >Priority: Major > > This operator is responsible for unbounded streaming inner join, and will be > optimized in following cases: > # If the join keys (with equality condition) are also primary key, we will > have a more efficient state layout > # If the inputs have primary keys, but join keys are not primary key, we can > also come up with an efficient state layout > # Inputs don't have primary keys, this will go to default implementation -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11943) Support TopN feature for SQL
[ https://issues.apache.org/jira/browse/FLINK-11943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-11943. -- Resolution: Duplicate Fix Version/s: 1.9.0 > Support TopN feature for SQL > > > Key: FLINK-11943 > URL: https://issues.apache.org/jira/browse/FLINK-11943 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Jark Wu >Priority: Major > Fix For: 1.9.0 > > > TopN is a frequently used feature in data analysis. We can use ORDER BY + > LIMIT to easily express a TopN query, e.g. {{SELECT * FROM T ORDER BY amount > DESC LIMIT 10}}. > But this is a global TopN, there is a great requirement for per-group TopN. > For example, top 10 shops for each category. In order to avoid introducing > new syntax for this, we would like to use traditional syntax to express it by > using {{ROW_NUMBER}} over window + {{FILTER}} to limit the numbers. > For example: > SELECT * > FROM ( > SELECT category, shopId, sales, > [ROW_NUMBER()|RANK()|DENSE_RANK()] OVER > (PARTITION BY category ORDER BY sales ASC) as rownum > FROM shop_sales > ) > WHERE rownum <= 10 > This issue is aiming to optimize this query to an {{Rank}} node instead of > {{Over}} plus {{Calc}}. And translate the {{Rank}} node into physical > operators. > There are some optimization for rank operator based on the different input of > the Rank. We would like to implement the basic and one-fit-all > implementation. And do the performance improvement later. > Here is a brief design doc: > https://docs.google.com/document/d/14JCV6X6hcpoA51loprgntZNxQ2NmnDLucxgGY8xVDuI/edit# -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11944) Support FirstRow and LastRow for SQL
[ https://issues.apache.org/jira/browse/FLINK-11944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kurt Young closed FLINK-11944. -- Resolution: Duplicate Fix Version/s: 1.9.0 > Support FirstRow and LastRow for SQL > > > Key: FLINK-11944 > URL: https://issues.apache.org/jira/browse/FLINK-11944 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Runtime >Reporter: Jark Wu >Priority: Major > Fix For: 1.9.0 > > > Usually there are some duplicate data in the source due to some reasons. In > order to get a correct result, we need to do deduplication. FirstRow and > LastRow are two different strategy for deduplication. The syntax of FirstRow > and LastRow is similar to TopN, but order by a time attribute. For example: > SELECT * > FROM ( > SELECT *, > ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime DESC) as rownum > FROM T > ) > WHERE rownum = 1 > Some information about FirstRow & LastRow. > 1. the partition by key is the deduplicate key > 2. can only order by a time attribute (either proctime or rowtime) > 3. the rownum filter must be {{= 1}} or {{<= 1}} > 4. it is FirstRow when order direction is ASC, LastRow when order direction > is DESC > This issue is aiming to optimize this query to a FirstLastRow node instead of > Over plus Calc. And translate the it into physical operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on issue #8477: [FLINK-12241][hive] Support function related operations in GenericHiveMetastoreCatalog
bowenli86 commented on issue #8477: [FLINK-12241][hive] Support function related operations in GenericHiveMetastoreCatalog URL: https://github.com/apache/flink/pull/8477#issuecomment-493807628 Let's wait for https://github.com/apache/flink/pull/8480 to be merge first, then I'll refactor and rebase this PR accordingly This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285392260 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285404214 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285405796 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -192,46 +590,62 @@ protected Table createHiveTable(ObjectPath tablePath, CatalogBaseTable table) { "HiveCatalog only supports HiveCatalogTable and HiveCatalogView"); } - hiveTable.setSd(sd); - return hiveTable; } + /** +* Filter out Hive-created properties, and return Flink-created properties. +*/ + private static Map retrieveFlinkProperties(Map hiveTableParams) { + return hiveTableParams.entrySet().stream() + .filter(e -> e.getKey().startsWith(FLINK_PROPERTY_PREFIX)) + .collect(Collectors.toMap(e -> e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue())); + } + + /** +* Add a prefix to Flink-created properties to distinguish them from Hive-created properties. +*/ + private static Map maskFlinkProperties(Map properties) { + return properties.entrySet().stream() + .filter(e -> e.getKey() != null && e.getValue() != null) + .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + e.getKey(), e -> e.getValue())); + } + // -- partitions -- @Override public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) - throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { Review comment: revert tab change for partitions APIs all below This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285392064 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. Review comment: update javadoc? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285404036 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285405982 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java ## @@ -48,6 +48,9 @@ * A generic catalog implementation that holds all meta objects in memory. */ public class GenericInMemoryCatalog implements Catalog { + public static final String FLINK_META_PROPERTY_KEY = "is_generic"; Review comment: seems duplicate to HiveCatalog.GENERIC_META_PROPERTY_KEY. We can either use 'is_generic' and `mask/retrieveFlinkProperties()` to deal with the `flink.` prefix, or just use 'flink.is_generic' after/before calls to `mask/retrieveFlinkProperties()`. Either way, we need to be consistent This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285403624 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285403193 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285404166 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285403774 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; - LOG.info("Created HiveCatalog '{}'", catalogName); + protected final String catalogName; + protected final HiveConf hiveConf; + + private final String defaultDatabase; + protected IMetaStoreClient client; + + public HiveCatalog(String catalogName, String hivemetastoreURI) { + this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI)); } public HiveCatalog(String catalogName, HiveConf hiveConf) { - super(catalogName, hiveConf); + this(catalogName, DEFAULT_DB, hiveConf); + } + + public HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), "catalogName cannot be null or empty"); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), "defaultDatabase cannot be null or empty"); + this.catalogName = catalogName; + this.defaultDatabase = defaultDatabase; + this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); LOG.info("Created HiveCatalog '{}'", catalogName); } + private static HiveConf getHiveConf(String hiveMetastoreURI) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), "hiveMetastoreURI cannot be null or empty"); + + HiveConf hiveConf = new HiveConf(); + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, hiveMetastoreURI); + return hiveConf; + } + + private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) { + try { + return RetryingMetaStoreClient.getProxy( + hiveConf, + null, + null, + HiveMetaStoreClient.class.getName(), + true); + } catch (MetaException e) { + throw new CatalogException("Failed to create Hive metastore client", e); + } + } + + @Override + public void open() throws CatalogException { + if (client == null) { + client = getMetastoreClient(hiveConf); + LOG.info("Connected to Hive metastore"); + } + + if (!databaseExists(defaultDatabase)) { + throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", + defaultDatabase, catalogName)); + } + } + + @Override + public void close() throws CatalogException { + if (client != null) { + client.close(); + client = null; + LOG.info("Close connection to Hive metastore"); + } + } + // -- databases -- + public String getDefaultDatabase() throws CatalogException { + return defaultDatabase; + } + @Override - protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) { - return new HiveCatalogDatabase( - hiveDatabase.getParameters(), - hiveDatabase.getLocationUri(), - hiveDatabase.getDescription()); + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { +
[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#discussion_r285403349 ## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java ## @@ -54,117 +75,494 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * A catalog implementation for Hive. + * Base class for catalogs backed by Hive metastore. */ -public class HiveCatalog extends HiveCatalogBase { +public class HiveCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalog.class); + private static final String DEFAULT_DB = "default"; - public HiveCatalog(String catalogName, String hivemetastoreURI) { - super(catalogName, hivemetastoreURI); + // Prefix used to distinguish properties created by Hive and Flink, + // as Hive metastore has its own properties created upon table creation and migration between different versions of metastore. + private static final String FLINK_PROPERTY_PREFIX = "flink."; + private static final String GENERC_META_PROPERTY_KEY = "flink.is_generic"; Review comment: In flink we usually name the variable as same as possible to the actual value. The 'PROPERTY' in the name is also a bit confusing. I named the above key 'FLINK_PROPERTY_PREFIX' because it's appended to every property key in the properties, while this is a single key itself. Shall it be something like "FLINK_IS_GENERIC_KEY"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.
asfgit closed pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12407) Add all table operators align Java Table API
[ https://issues.apache.org/jira/browse/FLINK-12407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-12407. --- Resolution: Fixed Fixed in master: b3604f7bee7456b8533e9ea222a833a2624e36c2 > Add all table operators align Java Table API > > > Key: FLINK-12407 > URL: https://issues.apache.org/jira/browse/FLINK-12407 > Project: Flink > Issue Type: Sub-task > Components: API / Python >Reporter: sunjincheng >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Add all table operators align Java Table API. the detail can be found in > [FLIP-38|https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot edited a comment on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API.
flinkbot edited a comment on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#issuecomment-491201143 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @sunjincheng121 [committer] * ✅ 5. Overall code [quality] is good. - Approved by @sunjincheng121 [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API.
sunjincheng121 commented on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API. URL: https://github.com/apache/flink/pull/8401#issuecomment-493798944 Thanks for your updated! And It makes sense to me about the doc. @WeiZhong94 +1 to merged. @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11051) Add Bounded(Group Window) FlatAggregate operator to streaming Table API
[ https://issues.apache.org/jira/browse/FLINK-11051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-11051. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in master: 142462fa4224c3435736ae21d32aaeb1586f1dce > Add Bounded(Group Window) FlatAggregate operator to streaming Table API > --- > > Key: FLINK-11051 > URL: https://issues.apache.org/jira/browse/FLINK-11051 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Add FlatAggregate operator to streaming group window Table API as described > in > [FLIP-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739]. > The usage: > {code:java} > tab.window(Tumble/Session/Slide... as 'w) > .groupBy('w, 'k1, 'k2) > .flatAggregate(tableAggregate('a)) > .select('w.rowtime, 'k1, 'k2, 'col1, 'col2) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8359: [FLINK-11051][table] Add streaming window FlatAggregate to Table API
asfgit closed pull request #8359: [FLINK-11051][table] Add streaming window FlatAggregate to Table API URL: https://github.com/apache/flink/pull/8359 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12534) Reduce the test cost for Python API
[ https://issues.apache.org/jira/browse/FLINK-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-12534. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in master: 063ee1bb11a580123ea0212b2c617ec6f71fc72a > Reduce the test cost for Python API > --- > > Key: FLINK-12534 > URL: https://issues.apache.org/jira/browse/FLINK-12534 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Travis >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Currently, we add the Python API Travis test for Scala 2.12 / Java 9 / Hadoop > 2.4.1. due to Python API using Py4j communicate with JVM, the test for Java 9 > is enough, and we can remove the test for Scala 2.12 and Hadoop 2.4.1. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12534) Reduce the test cost for Python API
[ https://issues.apache.org/jira/browse/FLINK-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-12534: Issue Type: Sub-task (was: Improvement) Parent: FLINK-12308 > Reduce the test cost for Python API > --- > > Key: FLINK-12534 > URL: https://issues.apache.org/jira/browse/FLINK-12534 > Project: Flink > Issue Type: Sub-task > Components: API / Python, Travis >Affects Versions: 1.9.0 >Reporter: sunjincheng >Assignee: Wei Zhong >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > Currently, we add the Python API Travis test for Scala 2.12 / Java 9 / Hadoop > 2.4.1. due to Python API using Py4j communicate with JVM, the test for Java 9 > is enough, and we can remove the test for Scala 2.12 and Hadoop 2.4.1. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API
asfgit closed pull request #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API URL: https://github.com/apache/flink/pull/8465 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8470: [hotfix][table][docs] correct `distinct operator` doc for table API.
asfgit closed pull request #8470: [hotfix][table][docs] correct `distinct operator` doc for table API. URL: https://github.com/apache/flink/pull/8470 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API
flinkbot edited a comment on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API URL: https://github.com/apache/flink/pull/8465#issuecomment-493037224 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❗ 3. Needs [attention] from. - Needs attention by @zentol [PMC] * ✅ 4. The change fits into the overall [architecture]. - Approved by @sunjincheng121 [committer] * ✅ 5. Overall code [quality] is good. - Approved by @sunjincheng121 [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API
sunjincheng121 commented on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API URL: https://github.com/apache/flink/pull/8465#issuecomment-493797710 @flinkbot approve all Merging This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8470: [hotfix][table][docs] correct `distinct operator` doc for table API.
flinkbot edited a comment on issue #8470: [hotfix][table][docs] correct `distinct operator` doc for table API. URL: https://github.com/apache/flink/pull/8470#issuecomment-493284070 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @sunjincheng121 [committer] * ✅ 5. Overall code [quality] is good. - Approved by @sunjincheng121 [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8470: [hotfix][table][docs] correct `distinct operator` doc for table API.
sunjincheng121 commented on issue #8470: [hotfix][table][docs] correct `distinct operator` doc for table API. URL: https://github.com/apache/flink/pull/8470#issuecomment-493797564 @flinkbot approve all Merging This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
flinkbot commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#issuecomment-493768589 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480#issuecomment-493768604 cc: @bowenli86 @twalthr This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz opened a new pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…
xuefuz opened a new pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat… URL: https://github.com/apache/flink/pull/8480 …alog ## What is the purpose of the change This PR is to combine GenericHiveMetastoreCatalog and HiveCatalog into one single class. The reasons are: 1. they both talk to Hive metastore to store/retrieve metadata stored in Hive and the implementation is very similar. 2. User needs two catalogs in order to store both generic metadata and Hive metadata. If the two points to the same Hive metastore instance, each will see the same list of meta objects (such as tables) but may not understand if the objects are not created by itself. This creates a lot of confusion to the users. If the two catalog doesn't share Hive metastore, then it's a deployment overhead. Using a single catalog avoids both. To differentiate the two types of meta objects in a single catalog, Generic meta objects will have a property named "is_generic". ## Brief change log *(for example:)* - Combined and merged code in GenericHiveMetastoreCatalog and HiveCatalogBase into HiveCatalog - Removed GenericHiveMetastoreCatalog and HiveCatalogBase class - Added abstract classes for tables/views/functions/partitions/databases for both type of meta objects - Adapted existing tests ## Verifying this change This change is already covered by existing tests, such as GenericInMemoryCatalogTest, GenericHiveMetastoreCatalogTest, and HiveCatalogTest. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (No) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (No) - The S3 file system connector: (No) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12552) Combine HiveCatalog and GenericHiveMetastoreCatalog
[ https://issues.apache.org/jira/browse/FLINK-12552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12552: --- Labels: pull-request-available (was: ) > Combine HiveCatalog and GenericHiveMetastoreCatalog > --- > > Key: FLINK-12552 > URL: https://issues.apache.org/jira/browse/FLINK-12552 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12552) Combine HiveCatalog and GenericHiveMetastoreCatalog
Xuefu Zhang created FLINK-12552: --- Summary: Combine HiveCatalog and GenericHiveMetastoreCatalog Key: FLINK-12552 URL: https://issues.apache.org/jira/browse/FLINK-12552 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Xuefu Zhang Assignee: Xuefu Zhang -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402#discussion_r285383592 ## File path: flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.api.core; + +import org.apache.flink.ml.api.misc.param.ParamInfo; +import org.apache.flink.ml.api.misc.param.WithParams; +import org.apache.flink.ml.api.misc.persist.Persistable; +import org.apache.flink.ml.util.param.ExtractParamInfosUtil; +import org.apache.flink.ml.util.persist.MLStageFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Base class for a stage in a pipeline. The interface is only a concept, and does not have any + * actual functionality. Its subclasses must be either Estimator or Transformer. No other classes + * should inherit this interface directly. + * + * Each pipeline stage is with parameters and meanwhile persistable. + * + * @param The class type of the PipelineStage implementation itself, used by {@link + *org.apache.flink.ml.api.misc.param.WithParams} + * @see WithParams + */ +interface PipelineStage> extends WithParams, Serializable, + Persistable { + + default String toJson() { Review comment: +1 to do it more flexible If we would want to support other formats for e.g. PMML for serialization/deserialization, then toJson/fromJson doesn't match to logic in this case. I suggest move serialization/deserialization implementation of logic to separate object from direct implementation in this interface - and inject serializer/deserializer to pipeline object. I mean next ```java class ExampleTransformer implements Transformer{ public ExampleTransformer(PipelineSerializer implementationForSpecificFormat){ this.serializer=implementationForSpecificFormat; } public String serialization() {// renamed version of toJson() return this.serializer.serialize(this); } . //some approach with deserializer } ``` or move out serialization and deserialization methods from Pipeline objects at all. ```java Transformer example = new ExampleTransformer(); //actions with ExampleTransformer String serializedPipeline = new PipelineSerializer("supported_format_name_like_json").serialize(example); ``` I think this approach is more flexible for support different formats for serialization\deserialization pipelines and this the changes we can do in the next PR if Flink community will approve this proposal. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402#discussion_r285383592 ## File path: flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.api.core; + +import org.apache.flink.ml.api.misc.param.ParamInfo; +import org.apache.flink.ml.api.misc.param.WithParams; +import org.apache.flink.ml.api.misc.persist.Persistable; +import org.apache.flink.ml.util.param.ExtractParamInfosUtil; +import org.apache.flink.ml.util.persist.MLStageFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Base class for a stage in a pipeline. The interface is only a concept, and does not have any + * actual functionality. Its subclasses must be either Estimator or Transformer. No other classes + * should inherit this interface directly. + * + * Each pipeline stage is with parameters and meanwhile persistable. + * + * @param The class type of the PipelineStage implementation itself, used by {@link + *org.apache.flink.ml.api.misc.param.WithParams} + * @see WithParams + */ +interface PipelineStage> extends WithParams, Serializable, + Persistable { + + default String toJson() { Review comment: +1 to do it more flexible If we would want to support other formats for e.g. PMML for serialization/deserialization, then toJson/fromJson doesn't match to logic in this case. I suggest move serialization/deserialization implementation of logic to separate object from direct implementation in this interface - and inject serializer/deserializer to pipeline object. I mean next ```java class ExampleTransformer implements Transformer{ public ExampleTransformer(PipelineSerializer implementationForSpecificFormat){ this.serializer=implementationForSpecificFormat; } public String serialization() {// renamed version of toJson() return this.serializer.serialize(this); } . //some approach with deserializer } ``` or move out serialization and deserialization methods from Pipeline objects at all. ```java Transformer example = new ExampleTransformer(); //actions with ExampleTransformer String serializedPipeline = new PipelineSerializer("supported_format_name_like_json").serialize(example); ``` I think this approach is more flexible for support different formats for serialization\deserialization pipelines and this the changes we can do in the next PR if community will approve this propolsal. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib
ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib URL: https://github.com/apache/flink/pull/8402#discussion_r285383592 ## File path: flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.api.core; + +import org.apache.flink.ml.api.misc.param.ParamInfo; +import org.apache.flink.ml.api.misc.param.WithParams; +import org.apache.flink.ml.api.misc.persist.Persistable; +import org.apache.flink.ml.util.param.ExtractParamInfosUtil; +import org.apache.flink.ml.util.persist.MLStageFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Base class for a stage in a pipeline. The interface is only a concept, and does not have any + * actual functionality. Its subclasses must be either Estimator or Transformer. No other classes + * should inherit this interface directly. + * + * Each pipeline stage is with parameters and meanwhile persistable. + * + * @param The class type of the PipelineStage implementation itself, used by {@link + *org.apache.flink.ml.api.misc.param.WithParams} + * @see WithParams + */ +interface PipelineStage> extends WithParams, Serializable, + Persistable { + + default String toJson() { Review comment: +1 to rename methods for serialization/deserialization model stages. If we would want to support other formats for e.g. PMML for serialization/deserialization, then toJson/fromJson doesn't match to logic in this case. I suggest move serialization/deserialization implementation of logic to separate object from direct implementation in this interface - and inject serializer/deserializer to pipeline object. I mean next ```java class ExampleTransformer implements Transformer{ public ExampleTransformer(PipelineSerializer implementationForSpecificFormat){ this.serializer=implementationForSpecificFormat; } public String serialization() {// renamed version of toJson() return this.serializer.serialize(this); } . //some approach with deserializer } ``` or move out serialization and deserialization methods from Pipeline objects at all. ```java Transformer example = new ExampleTransformer(); //actions with ExampleTransformer String serializedPipeline = new PipelineSerializer("supported_format_name_like_json").serialize(example); ``` I think this approach is more flexible for support different formats for serialization\deserialization pipelines and this the changes we can do in the next PR if community will approve this propolsal. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function
Aitozi commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function URL: https://github.com/apache/flink/pull/8280#issuecomment-493753651 ping @StephanEwen This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi commented on issue #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
Aitozi commented on issue #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#issuecomment-493753561 Hi, @StefanRRichter can you help take a look on this PR when you have time, thx. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
flinkbot commented on issue #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479#issuecomment-493753515 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Aitozi opened a new pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file
Aitozi opened a new pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file URL: https://github.com/apache/flink/pull/8479 ## What is the purpose of the change As described in the [jira](https://issues.apache.org/jira/browse/FLINK-11193), User's customize configuration which is configured by `backend.configure()` method will be override by the configuration loading from flink-conf.yaml. I think the config in the code should has a higher priority than the default file configuration. ## Brief change log Merge configuration before create the new state backend. ## Verifying this change Adding a test case to verify. This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-12550) hostnames with a dot never receive local input splits
[ https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397 ] Felix seibert edited comment on FLINK-12550 at 5/19/19 11:25 AM: - After openining PR #8478 yesterday, I have some additional considerations. The status quo is the following: To check if an input split is locally available for a taskmanager, the hostname of the taskmanager is compared to the hostname of the input split. This happens in this [line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]:[link title|http://example.com] {code:java} if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code} h is the hostname of a machine hosting the input split, flinkHost is the taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() truncates at the first occurrance of a ".". So, if a split is present on "host.domain", and the hostname of the taskmanager is "host.domain" too, we actually check whether "host".equals("host.domain") which is not true. PR #8478 applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems that this problem is fixed. BUT. What if there is a taskmanager on host "host.cluster1.domain", and an input split on host "host.cluster2.domain"? isLocal() would recognize this split as being on the same host as the taskmanager, which is clearly not the case. So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of the two compared hostnames. Or is there any reason why it should be applied? was (Author: felxe): After openining PR #8478 yesterday, I have some additional considerations. The status quo is the following: To check if an input split is locally available for a taskmanager, the hostname of the taskmanager is compared to the hostname of the input split. This happens in this [line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]: {code:java} if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code} h is the hostname of a machine hosting the input split, flinkHost is the taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() truncates at the first occurrance of a ".". So, if a split is present on "host.domain", and the hostname of the taskmanager is "host.domain" too, we actually check whether "host".equals("host.domain") which is not true. PR #8478 applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems that this problem is fixed. BUT. What if there is a taskmanager on host "host.cluster1.domain", and an input split on host "host.cluster2.domain"? isLocal() would recognize this split as being on the same host as the taskmanager, which is clearly not the case. So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of the two compared hostnames. Or is there any reason why it should be applied? > hostnames with a dot never receive local input splits > - > > Key: FLINK-12550 > URL: https://issues.apache.org/jira/browse/FLINK-12550 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.8.0 >Reporter: Felix seibert >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > LocatableInputSplitAssigner (in package api.common.io) fails to assign local > input splits to hosts whose hostname contains a dot ("."). To reproduce add > the following test to LocatableSplitAssignerTest and execute it. It will > always fail. In my mind, this is contrary to the expected behaviour, which is > that the host should obtain the one split that is stored on the very same > machine. > > {code:java} > @Test > public void testLocalSplitAssignmentForHostWithDomainName() { >try { > String hostNameWithDot = "testhost.testdomain"; > // load one split > Set splits = new HashSet(); > splits.add(new LocatableInputSplit(0, hostNameWithDot)); > // get next split for the host > LocatableInputSplitAssigner ia = new > LocatableInputSplitAssigner(splits); > InputSplit is = null; > ia.getNextInputSplit(hostNameWithDot, 0); > // there should be exactly zero remote and one local assignment > assertEquals(0, ia.getNumberOfRemoteAssignments()); > assertEquals(1, ia.getNumberOfLocalAssignments()); >} >catch (Exception e) { > e.printStackTrace(); > fail(e.getMessage()); >} > } > {code} > I also
[jira] [Comment Edited] (FLINK-12550) hostnames with a dot never receive local input splits
[ https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397 ] Felix seibert edited comment on FLINK-12550 at 5/19/19 11:25 AM: - After openining PR #8478 yesterday, I have some additional considerations. The status quo is the following: To check if an input split is locally available for a taskmanager, the hostname of the taskmanager is compared to the hostname of the input split. This happens in this line [https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]: {code:java} if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code} h is the hostname of a machine hosting the input split, flinkHost is the taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() truncates at the first occurrance of a ".". So, if a split is present on "host.domain", and the hostname of the taskmanager is "host.domain" too, we actually check whether "host".equals("host.domain") which is not true. PR #8478 applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems that this problem is fixed. BUT. What if there is a taskmanager on host "host.cluster1.domain", and an input split on host "host.cluster2.domain"? isLocal() would recognize this split as being on the same host as the taskmanager, which is clearly not the case. So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of the two compared hostnames. Or is there any reason why it should be applied? was (Author: felxe): After openining PR #8478 yesterday, I have some additional considerations. The status quo is the following: To check if an input split is locally available for a taskmanager, the hostname of the taskmanager is compared to the hostname of the input split. This happens in this [line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]:[link title|http://example.com] {code:java} if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code} h is the hostname of a machine hosting the input split, flinkHost is the taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() truncates at the first occurrance of a ".". So, if a split is present on "host.domain", and the hostname of the taskmanager is "host.domain" too, we actually check whether "host".equals("host.domain") which is not true. PR #8478 applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems that this problem is fixed. BUT. What if there is a taskmanager on host "host.cluster1.domain", and an input split on host "host.cluster2.domain"? isLocal() would recognize this split as being on the same host as the taskmanager, which is clearly not the case. So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of the two compared hostnames. Or is there any reason why it should be applied? > hostnames with a dot never receive local input splits > - > > Key: FLINK-12550 > URL: https://issues.apache.org/jira/browse/FLINK-12550 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.8.0 >Reporter: Felix seibert >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > LocatableInputSplitAssigner (in package api.common.io) fails to assign local > input splits to hosts whose hostname contains a dot ("."). To reproduce add > the following test to LocatableSplitAssignerTest and execute it. It will > always fail. In my mind, this is contrary to the expected behaviour, which is > that the host should obtain the one split that is stored on the very same > machine. > > {code:java} > @Test > public void testLocalSplitAssignmentForHostWithDomainName() { >try { > String hostNameWithDot = "testhost.testdomain"; > // load one split > Set splits = new HashSet(); > splits.add(new LocatableInputSplit(0, hostNameWithDot)); > // get next split for the host > LocatableInputSplitAssigner ia = new > LocatableInputSplitAssigner(splits); > InputSplit is = null; > ia.getNextInputSplit(hostNameWithDot, 0); > // there should be exactly zero remote and one local assignment > assertEquals(0, ia.getNumberOfRemoteAssignments()); > assertEquals(1, ia.getNumberOfLocalAssignments()); >} >catch (Exception e) { > e.printStackTrace(); > fail(e.getMessage()); >} > } > {code} > I also experienced
[jira] [Comment Edited] (FLINK-12550) hostnames with a dot never receive local input splits
[ https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397 ] Felix seibert edited comment on FLINK-12550 at 5/19/19 11:24 AM: - After openining PR #8478 yesterday, I have some additional considerations. The status quo is the following: To check if an input split is locally available for a taskmanager, the hostname of the taskmanager is compared to the hostname of the input split. This happens in this [line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]: {code:java} if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code} h is the hostname of a machine hosting the input split, flinkHost is the taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() truncates at the first occurrance of a ".". So, if a split is present on "host.domain", and the hostname of the taskmanager is "host.domain" too, we actually check whether "host".equals("host.domain") which is not true. PR #8478 applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems that this problem is fixed. BUT. What if there is a taskmanager on host "host.cluster1.domain", and an input split on host "host.cluster2.domain"? isLocal() would recognize this split as being on the same host as the taskmanager, which is clearly not the case. So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of the two compared hostnames. Or is there any reason why it should be applied? was (Author: felxe): After openining PR #8478 yesterday, I have some additional considerations. The status quo is the following: To check if an input split is locally available for a taskmanager, the hostname of the taskmanager is compared to the hostname of the input split. This happens in [this line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]: {code:java} if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code} h is the hostname of a machine hosting the input split, flinkHost is the taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() truncates at the first occurrance of a ".". So, if a split is present on "host.domain", and the hostname of the taskmanager is "host.domain" too, we actually check whether "host".equals("host.domain") which is not true. PR #8478 applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems that this problem is fixed. BUT. What if there is a taskmanager on host "host.cluster1.domain", and an input split on host "host.cluster2.domain"? isLocal() would recognize this split as being on the same host as the taskmanager, which is clearly not the case. So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of the two compared hostnames. Or is there any reason why it should be applied? > hostnames with a dot never receive local input splits > - > > Key: FLINK-12550 > URL: https://issues.apache.org/jira/browse/FLINK-12550 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.8.0 >Reporter: Felix seibert >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > LocatableInputSplitAssigner (in package api.common.io) fails to assign local > input splits to hosts whose hostname contains a dot ("."). To reproduce add > the following test to LocatableSplitAssignerTest and execute it. It will > always fail. In my mind, this is contrary to the expected behaviour, which is > that the host should obtain the one split that is stored on the very same > machine. > > {code:java} > @Test > public void testLocalSplitAssignmentForHostWithDomainName() { >try { > String hostNameWithDot = "testhost.testdomain"; > // load one split > Set splits = new HashSet(); > splits.add(new LocatableInputSplit(0, hostNameWithDot)); > // get next split for the host > LocatableInputSplitAssigner ia = new > LocatableInputSplitAssigner(splits); > InputSplit is = null; > ia.getNextInputSplit(hostNameWithDot, 0); > // there should be exactly zero remote and one local assignment > assertEquals(0, ia.getNumberOfRemoteAssignments()); > assertEquals(1, ia.getNumberOfLocalAssignments()); >} >catch (Exception e) { > e.printStackTrace(); > fail(e.getMessage()); >} > } > {code} > I also experienced this error in practice, and
[jira] [Comment Edited] (FLINK-12550) hostnames with a dot never receive local input splits
[ https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397 ] Felix seibert edited comment on FLINK-12550 at 5/19/19 11:23 AM: - After openining PR #8478 yesterday, I have some additional considerations. The status quo is the following: To check if an input split is locally available for a taskmanager, the hostname of the taskmanager is compared to the hostname of the input split. This happens in [this line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]: {code:java} if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code} h is the hostname of a machine hosting the input split, flinkHost is the taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() truncates at the first occurrance of a ".". So, if a split is present on "host.domain", and the hostname of the taskmanager is "host.domain" too, we actually check whether "host".equals("host.domain") which is not true. PR #8478 applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems that this problem is fixed. BUT. What if there is a taskmanager on host "host.cluster1.domain", and an input split on host "host.cluster2.domain"? isLocal() would recognize this split as being on the same host as the taskmanager, which is clearly not the case. So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of the two compared hostnames. Or is there any reason why it should be applied? was (Author: felxe): After openining PR #8478 yesterday, I have some additional considerations. The status quo is the following: To check if an input split is locally available for a taskmanager, the hostname of the taskmanager is compared to the hostname of the input split. This happens in [this line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]:] {code:java} if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code} h is the hostname of a machine hosting the input split, flinkHost is the taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() truncates at the first occurrance of a ".". So, if a split is present on "host.domain", and the hostname of the taskmanager is "host.domain" too, we actually check whether "host".equals("host.domain") which is not true. PR #8478 applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems that this problem is fixed. BUT. What if there is a taskmanager on host "host.cluster1.domain", and an input split on host "host.cluster2.domain"? isLocal() would recognize this split as being on the same host as the taskmanager, which is clearly not the case. So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of the two compared hostnames. Or is there any reason why it should be applied? > hostnames with a dot never receive local input splits > - > > Key: FLINK-12550 > URL: https://issues.apache.org/jira/browse/FLINK-12550 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.8.0 >Reporter: Felix seibert >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > LocatableInputSplitAssigner (in package api.common.io) fails to assign local > input splits to hosts whose hostname contains a dot ("."). To reproduce add > the following test to LocatableSplitAssignerTest and execute it. It will > always fail. In my mind, this is contrary to the expected behaviour, which is > that the host should obtain the one split that is stored on the very same > machine. > > {code:java} > @Test > public void testLocalSplitAssignmentForHostWithDomainName() { >try { > String hostNameWithDot = "testhost.testdomain"; > // load one split > Set splits = new HashSet(); > splits.add(new LocatableInputSplit(0, hostNameWithDot)); > // get next split for the host > LocatableInputSplitAssigner ia = new > LocatableInputSplitAssigner(splits); > InputSplit is = null; > ia.getNextInputSplit(hostNameWithDot, 0); > // there should be exactly zero remote and one local assignment > assertEquals(0, ia.getNumberOfRemoteAssignments()); > assertEquals(1, ia.getNumberOfLocalAssignments()); >} >catch (Exception e) { > e.printStackTrace(); > fail(e.getMessage()); >} > } > {code} > I also experienced this error in practice,
[jira] [Comment Edited] (FLINK-12550) hostnames with a dot never receive local input splits
[ https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397 ] Felix seibert edited comment on FLINK-12550 at 5/19/19 11:23 AM: - After openining PR #8478 yesterday, I have some additional considerations. The status quo is the following: To check if an input split is locally available for a taskmanager, the hostname of the taskmanager is compared to the hostname of the input split. This happens in [this line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]:] {code:java} if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code} h is the hostname of a machine hosting the input split, flinkHost is the taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() truncates at the first occurrance of a ".". So, if a split is present on "host.domain", and the hostname of the taskmanager is "host.domain" too, we actually check whether "host".equals("host.domain") which is not true. PR #8478 applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems that this problem is fixed. BUT. What if there is a taskmanager on host "host.cluster1.domain", and an input split on host "host.cluster2.domain"? isLocal() would recognize this split as being on the same host as the taskmanager, which is clearly not the case. So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of the two compared hostnames. Or is there any reason why it should be applied? was (Author: felxe): After openining PR #8478 yesterday, I have some additional considerations. The status quo is the following: * To check if an input split is locally available for a taskmanager, the hostname of the taskmanager is compared to the hostname of the input split. This happens in [this line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]:] {code:java} if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code} h is the hostname of a machine hosting the input split, flinkHost is the taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() truncates at the first occurrance of a ".". So, if a split is present on "host.domain", and the hostname of the taskmanager is "host.domain" too, we actually check whether "host".equals("host.domain") which is not true. PR #8478 applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems that this problem is fixed. BUT. What if there is a taskmanager on host "host.cluster1.domain", and an input split on host "host.cluster2.domain"? isLocal() would recognize this split as being on the same host as the taskmanager, which is clearly not the case. So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of the two compared hostnames. Or is there any reason why it should be applied? > hostnames with a dot never receive local input splits > - > > Key: FLINK-12550 > URL: https://issues.apache.org/jira/browse/FLINK-12550 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.8.0 >Reporter: Felix seibert >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > LocatableInputSplitAssigner (in package api.common.io) fails to assign local > input splits to hosts whose hostname contains a dot ("."). To reproduce add > the following test to LocatableSplitAssignerTest and execute it. It will > always fail. In my mind, this is contrary to the expected behaviour, which is > that the host should obtain the one split that is stored on the very same > machine. > > {code:java} > @Test > public void testLocalSplitAssignmentForHostWithDomainName() { >try { > String hostNameWithDot = "testhost.testdomain"; > // load one split > Set splits = new HashSet(); > splits.add(new LocatableInputSplit(0, hostNameWithDot)); > // get next split for the host > LocatableInputSplitAssigner ia = new > LocatableInputSplitAssigner(splits); > InputSplit is = null; > ia.getNextInputSplit(hostNameWithDot, 0); > // there should be exactly zero remote and one local assignment > assertEquals(0, ia.getNumberOfRemoteAssignments()); > assertEquals(1, ia.getNumberOfLocalAssignments()); >} >catch (Exception e) { > e.printStackTrace(); > fail(e.getMessage()); >} > } > {code} > I also experienced this error in practice,
[jira] [Commented] (FLINK-12550) hostnames with a dot never receive local input splits
[ https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397 ] Felix seibert commented on FLINK-12550: --- After openining PR #8478 yesterday, I have some additional considerations. The status quo is the following: * To check if an input split is locally available for a taskmanager, the hostname of the taskmanager is compared to the hostname of the input split. This happens in [this line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]:] {code:java} if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code} h is the hostname of a machine hosting the input split, flinkHost is the taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() truncates at the first occurrance of a ".". So, if a split is present on "host.domain", and the hostname of the taskmanager is "host.domain" too, we actually check whether "host".equals("host.domain") which is not true. PR #8478 applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems that this problem is fixed. BUT. What if there is a taskmanager on host "host.cluster1.domain", and an input split on host "host.cluster2.domain"? isLocal() would recognize this split as being on the same host as the taskmanager, which is clearly not the case. So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of the two compared hostnames. Or is there any reason why it should be applied? > hostnames with a dot never receive local input splits > - > > Key: FLINK-12550 > URL: https://issues.apache.org/jira/browse/FLINK-12550 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.8.0 >Reporter: Felix seibert >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > LocatableInputSplitAssigner (in package api.common.io) fails to assign local > input splits to hosts whose hostname contains a dot ("."). To reproduce add > the following test to LocatableSplitAssignerTest and execute it. It will > always fail. In my mind, this is contrary to the expected behaviour, which is > that the host should obtain the one split that is stored on the very same > machine. > > {code:java} > @Test > public void testLocalSplitAssignmentForHostWithDomainName() { >try { > String hostNameWithDot = "testhost.testdomain"; > // load one split > Set splits = new HashSet(); > splits.add(new LocatableInputSplit(0, hostNameWithDot)); > // get next split for the host > LocatableInputSplitAssigner ia = new > LocatableInputSplitAssigner(splits); > InputSplit is = null; > ia.getNextInputSplit(hostNameWithDot, 0); > // there should be exactly zero remote and one local assignment > assertEquals(0, ia.getNumberOfRemoteAssignments()); > assertEquals(1, ia.getNumberOfLocalAssignments()); >} >catch (Exception e) { > e.printStackTrace(); > fail(e.getMessage()); >} > } > {code} > I also experienced this error in practice, and will later today open a pull > request to fix it. > > Note: I'm not sure if I selected the correct component category. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12550) hostnames with a dot never receive local input splits
[ https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Felix seibert updated FLINK-12550: -- Description: LocatableInputSplitAssigner (in package api.common.io) fails to assign local input splits to hosts whose hostname contains a dot ("."). To reproduce add the following test to LocatableSplitAssignerTest and execute it. It will always fail. In my mind, this is contrary to the expected behaviour, which is that the host should obtain the one split that is stored on the very same machine. {code:java} @Test public void testLocalSplitAssignmentForHostWithDomainName() { try { String hostNameWithDot = "testhost.testdomain"; // load one split Set splits = new HashSet(); splits.add(new LocatableInputSplit(0, hostNameWithDot)); // get next split for the host LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); InputSplit is = null; ia.getNextInputSplit(hostNameWithDot, 0); // there should be exactly zero remote and one local assignment assertEquals(0, ia.getNumberOfRemoteAssignments()); assertEquals(1, ia.getNumberOfLocalAssignments()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } {code} I also experienced this error in practice, and will later today open a pull request to fix it. Note: I'm not sure if I selected the correct component category. was: LocatableInputSplitAssigner (in package api.common.io) fails to assign local input splits to hosts whose hostname contains a dot ("."). To reproduce add the following test to LocatableSplitAssignerTest and execute it. It will always fail. In my mind, this is contrary to the expected behaviour, which is that the host should obtain the one split that is stored on the very same machine. {code:java} @Test public void testLocalSplitAssignmentForHostWithDomainName() { try { String hostNameWithDot = "testhost.testdomain"; // load one split Set splits = new HashSet(); splits.add(new LocatableInputSplit(0, hostNameWithDot)); // get all available splits LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); InputSplit is = null; ia.getNextInputSplit(hostNameWithDot, 0); assertEquals(0, ia.getNumberOfRemoteAssignments()); assertEquals(1, ia.getNumberOfLocalAssignments()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); } } {code} I also experienced this error in practice, and will later today open a pull request to fix it. Note: I'm not sure if I selected the correct component category. > hostnames with a dot never receive local input splits > - > > Key: FLINK-12550 > URL: https://issues.apache.org/jira/browse/FLINK-12550 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.8.0 >Reporter: Felix seibert >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > LocatableInputSplitAssigner (in package api.common.io) fails to assign local > input splits to hosts whose hostname contains a dot ("."). To reproduce add > the following test to LocatableSplitAssignerTest and execute it. It will > always fail. In my mind, this is contrary to the expected behaviour, which is > that the host should obtain the one split that is stored on the very same > machine. > > {code:java} > @Test > public void testLocalSplitAssignmentForHostWithDomainName() { >try { > String hostNameWithDot = "testhost.testdomain"; > // load one split > Set splits = new HashSet(); > splits.add(new LocatableInputSplit(0, hostNameWithDot)); > // get next split for the host > LocatableInputSplitAssigner ia = new > LocatableInputSplitAssigner(splits); > InputSplit is = null; > ia.getNextInputSplit(hostNameWithDot, 0); > // there should be exactly zero remote and one local assignment > assertEquals(0, ia.getNumberOfRemoteAssignments()); > assertEquals(1, ia.getNumberOfLocalAssignments()); >} >catch (Exception e) { > e.printStackTrace(); > fail(e.getMessage()); >} > } > {code} > I also experienced this error in practice, and will later today open a pull > request to fix it. > > Note: I'm not sure if I selected the correct component category. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig URL: https://github.com/apache/flink/pull/8459#discussion_r285374038 ## File path: docs/dev/stream/state/state.md ## @@ -400,6 +400,17 @@ This option is not applicable for the incremental checkpointing in the RocksDB s - For existing jobs, this cleanup strategy can be activated or deactivated anytime in `StateTtlConfig`, e.g. after restart from savepoint. + Cleanup in background + +Besides cleanup in full snapshot, you also can activate the cleanup in background. The option will adapt +all the cleanup strategies with default values. Users who do not need to think about details or used backend. +Currently, Flink provide two kind of cleanup strategies in background and the default config value list below: + +* Incremental cleanup(cleanup size: 10, run cleanup for every record: true) +* Cleanup during RocksDB compaction(query time after how many entries: 1000) + +If users who want to get more fine-grained control about the special cleanup in background can use it directly. + Review comment: We also need a code example here: ``` Besides cleanup in full snapshot, you can also activate the cleanup in background. This option will activate a strategy configured by default if the background cleanup is supported for the used backend. This feature can be activated in `StateTtlConfig`: << ... code example is needed like in other chapters > For more fine-grained control over some special cleanup in background, you can configure it separately as described below. ``` This should go to the respective chapters where the parameter values are discussed: ``` * Incremental cleanup(cleanup size: 10, run cleanup for every record: true) * Cleanup during RocksDB compaction(query time after how many entries: 1000) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig URL: https://github.com/apache/flink/pull/8459#discussion_r285373468 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ## @@ -202,6 +204,15 @@ private IS createReducingState() throws Exception { } private TtlIncrementalCleanup getTtlIncrementalCleanup() { + //if cleanup in background and user did not specify the cleanup strategy, + //then build the incremental cleanup strategy and apply the default value + if (ttlConfig.isCleanupInBackground() && + ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy() == null) { + ttlConfig.getCleanupStrategies().activate( Review comment: I would be against modifying `ttlConfig` in any way. It is supposed to be immutable, once created by its builder. This protects against implicit changes of `ttlConfig` which is created by user. We should actually make `CleanupStrategies.strategies/activate` private or better even move `CleanupStrategies.strategies` to builder (instead of having there `CleanupStrategies`) and remove `CleanupStrategies.activate`. The builder should then also build immutable `CleanupStrategies` for `StateTtlConfig` at the end. If we move `isCleanupInBackground` to `CleanupStrategies` with its getter (where it rather belongs anyways) we can actually just modify `CleanupStrategies.getIncrementalCleanupStrategy/getRocksdbCompactFilterCleanupStrategy` to return default strategies as `strategies.getOrDefault` if they are not set but `isCleanupInBackground` is set. The default strategy singletons can be static final constants in respective classes. The backends and private field modifiers can also stay untouched but any strategy and backend can still independently decide what to use if `isCleanupInBackground` is set. `CleanupStrategies.inFullSnapshot/inRocksdbCompactFilter` should use the respective getters != null then, not `CleanupStrategies.strategies` directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12299) ExecutionConfig#setAutoWatermarkInterval should check param(interval should not less than zero)
[ https://issues.apache.org/jira/browse/FLINK-12299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Zagrebin updated FLINK-12299: Component/s: (was: Runtime / Coordination) Runtime / Operators API / DataStream > ExecutionConfig#setAutoWatermarkInterval should check param(interval should > not less than zero) > --- > > Key: FLINK-12299 > URL: https://issues.apache.org/jira/browse/FLINK-12299 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Runtime / Operators >Reporter: shiwuliang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In any scenario, `autoWatermarkInterval` should not be less than or equal to > zero. > First of all, this does not correspond to the meaning of > `autoWatermarkInterval`. > Second, in the case where `autoWatermarkInterval` is less than 0, we will not > be able to register ourselves in > `TimestampsAndPeriodicWatermarksOperator#open`, which will result in the > water level of this stream being kept at the lowest level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)