[jira] [Commented] (FLINK-12550) hostnames with a dot never receive local input splits

2019-05-19 Thread Quan Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843673#comment-16843673
 ] 

Quan Shi commented on FLINK-12550:
--

How about replace host with FQDN in JobMaster?

 
{code:java}
// Code in method JobMaster.requestNextInputSplit:
final String host = slot != null ? slot.getTaskManagerLocation().getHostname() 
: null;

//to 

final String host = slot != null ? 
slot.getTaskManagerLocation().getFQDNHostname() : null;
{code}
 

 

> hostnames with a dot never receive local input splits
> -
>
> Key: FLINK-12550
> URL: https://issues.apache.org/jira/browse/FLINK-12550
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet
>Affects Versions: 1.8.0
>Reporter: Felix seibert
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
> input splits to hosts whose hostname contains a dot ("."). To reproduce add 
> the following test to LocatableSplitAssignerTest and execute it. It will 
> always fail. In my mind, this is contrary to the expected behaviour, which is 
> that the host should obtain the one split that is stored on the very same 
> machine.
>  
> {code:java}
> @Test
> public void testLocalSplitAssignmentForHostWithDomainName() {
>try {
>   String hostNameWithDot = "testhost.testdomain";
>   // load one split
>   Set splits = new HashSet();
>   splits.add(new LocatableInputSplit(0, hostNameWithDot));
>   // get next split for the host
>   LocatableInputSplitAssigner ia = new 
> LocatableInputSplitAssigner(splits);
>   InputSplit is = null;
>   ia.getNextInputSplit(hostNameWithDot, 0);
>   // there should be exactly zero remote and one local assignment
>   assertEquals(0, ia.getNumberOfRemoteAssignments());
>   assertEquals(1, ia.getNumberOfLocalAssignments());
>}
>catch (Exception e) {
>   e.printStackTrace();
>   fail(e.getMessage());
>}
> }
> {code}
> I also experienced this error in practice, and will later today open a pull 
> request to fix it.
>  
> Note: I'm not sure if I selected the correct component category.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-19 Thread GitBox
yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r285428078
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaExampleUtil.java
 ##
 @@ -45,6 +45,7 @@ public static StreamExecutionEnvironment 
prepareExecutionEnv(ParameterTool param
env.getConfig().disableSysoutLogging();

env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 
1));
env.enableCheckpointing(5000); // create a checkpoint every 5 
seconds
+   
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
 
 Review comment:
   Have added a new test case to verify the exception behavior.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support 
partition related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#discussion_r285426012
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -407,4 +443,201 @@ private Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
}
}
+
+   // -- partitions --
+
+   @Override
+   public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+   throws CatalogException {
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   return client.getPartition(tablePath.getDatabaseName(), 
tablePath.getObjectName(),
+   getFullPartitionValues(tablePath, 
partitionSpec, getFieldNames(hiveTable.getPartitionKeys( != null;
+   } catch (NoSuchObjectException | TableNotExistException | 
PartitionSpecInvalidException e) {
+   return false;
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get partition %s of 
table %s", partitionSpec, tablePath), e);
+   }
+   }
+
+   @Override
+   public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+   throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionAlreadyExistsException, 
CatalogException {
+
+   validateCatalogPartition(partition);
+
+   Table hiveTable = getHiveTable(tablePath);
+
+   ensurePartitionedTable(tablePath, hiveTable);
+
+   try {
+   client.add_partition(createHivePartition(hiveTable, 
partitionSpec, partition));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to create partition %s of 
table %s", partitionSpec, tablePath));
+   }
+   }
+
+   @Override
+   public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
+   throws PartitionNotExistException, CatalogException {
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   ensurePartitionedTable(tablePath, hiveTable);
+   client.dropPartition(tablePath.getDatabaseName(), 
tablePath.getObjectName(),
+   getFullPartitionValues(tablePath, 
partitionSpec, getFieldNames(hiveTable.getPartitionKeys())), true);
+   } catch (NoSuchObjectException e) {
+   if (!ignoreIfNotExists) {
+   throw new 
PartitionNotExistException(catalogName, tablePath, partitionSpec);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to drop partition %s of 
table %s", partitionSpec, tablePath));
+   } catch (TableNotPartitionedException | TableNotExistException 
| PartitionSpecInvalidException e) {
+   throw new PartitionNotExistException(catalogName, 
tablePath, partitionSpec, e);
+   }
+   }
+
+   @Override
+   public List listPartitions(ObjectPath tablePath)
+   throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+   Table hiveTable = getHiveTable(tablePath);
+
+   ensurePartitionedTable(tablePath, hiveTable);
+
+   try {
+   return 
client.listPartitionNames(tablePath.getDatabaseName(), 
tablePath.getObjectName(), (short) -1).stream()
+   
.map(HiveCatalogBase::createPartitionSpec).collect(Collectors.toList());
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to list partitions of 
table %s", tablePath), e);
+   }
+   }
+
+   @Override
+   public List listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+   throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+   Table hiveTable = 

[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-19 Thread GitBox
yanghua commented on issue #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#issuecomment-493837160
 
 
   Triggered again, still has SQL deadlock : 
https://travis-ci.org/apache/flink/jobs/534620910


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support 
partition related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#discussion_r285427319
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -407,4 +443,201 @@ private Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
}
}
+
+   // -- partitions --
+
+   @Override
+   public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+   throws CatalogException {
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   return client.getPartition(tablePath.getDatabaseName(), 
tablePath.getObjectName(),
+   getFullPartitionValues(tablePath, 
partitionSpec, getFieldNames(hiveTable.getPartitionKeys( != null;
+   } catch (NoSuchObjectException | TableNotExistException | 
PartitionSpecInvalidException e) {
+   return false;
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get partition %s of 
table %s", partitionSpec, tablePath), e);
+   }
+   }
+
+   @Override
+   public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+   throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionAlreadyExistsException, 
CatalogException {
+
+   validateCatalogPartition(partition);
+
+   Table hiveTable = getHiveTable(tablePath);
+
+   ensurePartitionedTable(tablePath, hiveTable);
+
+   try {
+   client.add_partition(createHivePartition(hiveTable, 
partitionSpec, partition));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to create partition %s of 
table %s", partitionSpec, tablePath));
+   }
+   }
+
+   @Override
+   public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
+   throws PartitionNotExistException, CatalogException {
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   ensurePartitionedTable(tablePath, hiveTable);
+   client.dropPartition(tablePath.getDatabaseName(), 
tablePath.getObjectName(),
 
 Review comment:
   getHiveTable() requires a round trip. Compared to precise exception msg, I'd 
say we minimize the number of trips.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leesf commented on issue #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase

2019-05-19 Thread GitBox
leesf commented on issue #8226: [FLINK-12181][Tests] Port 
ExecutionGraphRestartTest to new codebase
URL: https://github.com/apache/flink/pull/8226#issuecomment-493836535
 
 
   @azagrebin Thanks for opening another PR, I will close this one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] leesf closed pull request #8226: [FLINK-12181][Tests] Port ExecutionGraphRestartTest to new codebase

2019-05-19 Thread GitBox
leesf closed pull request #8226: [FLINK-12181][Tests] Port 
ExecutionGraphRestartTest to new codebase
URL: https://github.com/apache/flink/pull/8226
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support 
partition related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#discussion_r285427041
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -407,4 +443,201 @@ private Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
}
}
+
+   // -- partitions --
+
+   @Override
+   public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+   throws CatalogException {
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   return client.getPartition(tablePath.getDatabaseName(), 
tablePath.getObjectName(),
+   getFullPartitionValues(tablePath, 
partitionSpec, getFieldNames(hiveTable.getPartitionKeys( != null;
+   } catch (NoSuchObjectException | TableNotExistException | 
PartitionSpecInvalidException e) {
+   return false;
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get partition %s of 
table %s", partitionSpec, tablePath), e);
+   }
+   }
+
+   @Override
+   public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+   throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionAlreadyExistsException, 
CatalogException {
+
+   validateCatalogPartition(partition);
+
+   Table hiveTable = getHiveTable(tablePath);
+
+   ensurePartitionedTable(tablePath, hiveTable);
+
+   try {
+   client.add_partition(createHivePartition(hiveTable, 
partitionSpec, partition));
 
 Review comment:
   However, additional trips to HMS should be avoided if we can. I guess 
getHiveTable() requires round trip.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8449: [FLINK-12235][hive] Support 
partition related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#discussion_r285426012
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -407,4 +443,201 @@ private Table getHiveTable(ObjectPath tablePath) throws 
TableNotExistException {
String.format("Failed to get table %s from Hive 
metastore", tablePath.getFullName()), e);
}
}
+
+   // -- partitions --
+
+   @Override
+   public boolean partitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+   throws CatalogException {
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   return client.getPartition(tablePath.getDatabaseName(), 
tablePath.getObjectName(),
+   getFullPartitionValues(tablePath, 
partitionSpec, getFieldNames(hiveTable.getPartitionKeys( != null;
+   } catch (NoSuchObjectException | TableNotExistException | 
PartitionSpecInvalidException e) {
+   return false;
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to get partition %s of 
table %s", partitionSpec, tablePath), e);
+   }
+   }
+
+   @Override
+   public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
+   throws TableNotExistException, TableNotPartitionedException, 
PartitionSpecInvalidException, PartitionAlreadyExistsException, 
CatalogException {
+
+   validateCatalogPartition(partition);
+
+   Table hiveTable = getHiveTable(tablePath);
+
+   ensurePartitionedTable(tablePath, hiveTable);
+
+   try {
+   client.add_partition(createHivePartition(hiveTable, 
partitionSpec, partition));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
PartitionAlreadyExistsException(catalogName, tablePath, partitionSpec);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to create partition %s of 
table %s", partitionSpec, tablePath));
+   }
+   }
+
+   @Override
+   public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
+   throws PartitionNotExistException, CatalogException {
+   try {
+   Table hiveTable = getHiveTable(tablePath);
+   ensurePartitionedTable(tablePath, hiveTable);
+   client.dropPartition(tablePath.getDatabaseName(), 
tablePath.getObjectName(),
+   getFullPartitionValues(tablePath, 
partitionSpec, getFieldNames(hiveTable.getPartitionKeys())), true);
+   } catch (NoSuchObjectException e) {
+   if (!ignoreIfNotExists) {
+   throw new 
PartitionNotExistException(catalogName, tablePath, partitionSpec);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to drop partition %s of 
table %s", partitionSpec, tablePath));
+   } catch (TableNotPartitionedException | TableNotExistException 
| PartitionSpecInvalidException e) {
+   throw new PartitionNotExistException(catalogName, 
tablePath, partitionSpec, e);
+   }
+   }
+
+   @Override
+   public List listPartitions(ObjectPath tablePath)
+   throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+   Table hiveTable = getHiveTable(tablePath);
+
+   ensurePartitionedTable(tablePath, hiveTable);
+
+   try {
+   return 
client.listPartitionNames(tablePath.getDatabaseName(), 
tablePath.getObjectName(), (short) -1).stream()
+   
.map(HiveCatalogBase::createPartitionSpec).collect(Collectors.toList());
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to list partitions of 
table %s", tablePath), e);
+   }
+   }
+
+   @Override
+   public List listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+   throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+   Table hiveTable = 

[GitHub] [flink] xuefuz commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog and 
GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#issuecomment-493834540
 
 
   PR is updated based on review feedback. Re. the new files shouldn't be part 
of the first commit but I forgot to include them. However, they cannot be in a 
separate JIRA as the first commit needed it and it cannot stand on its own. 
When we merge the PR, we can combine all commits into the first one, right? I'm 
not sure what issue it's. However, I can recommit a PR and do a forced push.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285425254
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
 Review comment:
   Updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on issue #8397: [FLINK-11421][Table SQL/Runtime]Add compilation options to allow comp…

2019-05-19 Thread GitBox
JingsongLi commented on issue #8397: [FLINK-11421][Table SQL/Runtime]Add 
compilation options to allow comp…
URL: https://github.com/apache/flink/pull/8397#issuecomment-493833300
 
 
   @liyafan82 Thanks your local benchmark.
   I think it is important to try to reproduce the results in a local machine. 
As you say, cluster environment is very complicate. In that case, the test data 
may not tell where optimizations exist and where regression occurs.
   The compiler optimizer is easy to reproduce on a single machine. Everything 
is mockable.
   
   Can you add some other patterns? Like vector computation benchmark?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285423726
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -48,6 +48,9 @@
  * A generic catalog implementation that holds all meta objects in memory.
  */
 public class GenericInMemoryCatalog implements Catalog {
+   public static final String FLINK_META_PROPERTY_KEY = "is_generic";
 
 Review comment:
   This is key is added automatically as part of metaobject instantiation. It's 
subject to mask/retrieve operations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285423478
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[jira] [Commented] (FLINK-12348) Make TableConfig configurable from string-string map

2019-05-19 Thread Jing Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843630#comment-16843630
 ] 

Jing Zhang commented on FLINK-12348:


[~twalthr], do you prefer that PlannerConfig provides methods for setting 
planner-related string-string pairs, however the underlying map stores in the 
member of TableConfig?
 And [~jark]'s concern also make sense, how about the following solution?
We provide two options for users to get/set planner configuration: 
 1. Directly read/write a member `org.apache.flink.configuration.Configuration` 
in TableConfig based on a certain key. In the way, there is no difference to 
set a planner-related configuration or API-related configuration. However, 
users need know the exact key string. 
 2. Calls methods in PlannerConfig to read/write the configuration. In the way, 
users must know that API-related configurations need to be configured via 
TableConfig and planner-related configurations need to be configured via 
PlannerConfig. However, users does not need to care the exact key string.

> Make TableConfig configurable from string-string map
> 
>
> Key: FLINK-12348
> URL: https://issues.apache.org/jira/browse/FLINK-12348
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / API
>Reporter: Jing Zhang
>Assignee: Jing Zhang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Since TableConfig already moved to API module in 
> [FLINK-11067|https://issues.apache.org/jira/browse/FLINK-11067], TableConfig 
> in blink-planner-module should not exist anymore. The issue aims to remove 
> the TableConfig in blink-planner-module, use TableConfig in API module 
> instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11560) Translate "Flink Applications" page into Chinese

2019-05-19 Thread Zhou Yumin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843623#comment-16843623
 ] 

Zhou Yumin commented on FLINK-11560:


Since the old PR [https://github.com/apache/flink-web/pull/182] for it has not 
been updated for months and the issue is still unassigned by the time I 
started. I decided to take the issue. Is that fine for you, [~jark] & 
[~stephenye]? My new PR [https://github.com/apache/flink-web/pull/215] is 
created for it. This is one of the last opening subtasks of the website 
translation. Hope for a quick review from your burdened work and we can make 
FLINK-11526 all done, thanks.

> Translate "Flink Applications" page into Chinese
> 
>
> Key: FLINK-11560
> URL: https://issues.apache.org/jira/browse/FLINK-11560
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Project Website
>Reporter: Jark Wu
>Assignee: Zhou Yumin
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Translate "Flink Applications" page into Chinese.
> The markdown file is located in: flink-web/flink-applications.zh.md
> The url link is: https://flink.apache.org/zh/flink-applications.html
> Please adjust the links in the page to Chinese pages when translating. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lirui-apache commented on issue #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog

2019-05-19 Thread GitBox
lirui-apache commented on issue #8449: [FLINK-12235][hive] Support partition 
related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#issuecomment-493827022
 
 
   @bowenli86 @xuefuz thanks for your comments. Please take a look at the 
updated PR. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-19 Thread GitBox
yanghua commented on issue #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#issuecomment-493826613
 
 
   @StefanRRichter After thinking seriously, my view of point has changed. 
   
   There are two options:
   
   * to keep all test cases and user jobs' default behavior, the 
`tolerableCheckpointFailureNumber`'s default value seems should be 
`Integer.MAX_VALUE`;
   * to keep the compatibility, the `tolerableCheckpointFailureNumber`'s 
default value seems should be 0;
   
   The key problem is the two options ' **range of action** is different. The 
`failOnCheckpointingErrors` option just cover the task's checkpointing error in 
the TaskManager end (even not the whole execution phase). While the 
`tolerableCheckpointFailureNumber` option needs to cover the whole trigger and 
execution phases.
   
   The thought to support option1:
   
   In many scenes, if we set the `tolerableCheckpointFailureNumber`'s default 
value to 0. The behavior of the users' job would be changed. It would cause 
more frequency to fail and restart. For example, the task is not ready to do 
checkpoint so it sends a decline message to trigger failure manager to fail and 
restart the job. So it changed the test cases and user jobs' default behavior. 
This is the reason why I change the default value to `Integer.MAX_VALUE`, 
although they are sporadic.
   
   The thought to support option2:
   
   If we set the `tolerableCheckpointFailureNumber`'s default value to 
`Integer.MAX_VALUE`, it may introduce a **compatibility issue** how to handle 
the `failOnCheckpointingErrors` config option in the future? The original 
thought is that the failOnCheckpointingErrors(two values : 0 and 
Integer.MAX_VALUE) is a subset of the tolerableCheckpointFailureNumber(0 to 
Integer.MAX_VALUE). We had wanted to deprecate the `failOnCheckpointingErrors` 
option in the third step.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285416058
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -192,46 +590,62 @@ protected Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
"HiveCatalog only supports HiveCatalogTable and 
HiveCatalogView");
}
 
-   hiveTable.setSd(sd);
-
return hiveTable;
}
 
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private static Map retrieveFlinkProperties(Map hiveTableParams) {
+   return hiveTableParams.entrySet().stream()
+   .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+   .collect(Collectors.toMap(e -> 
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+   }
+
+   /**
+* Add a prefix to Flink-created properties to distinguish them from 
Hive-created properties.
+*/
+   private static Map maskFlinkProperties(Map properties) {
+   return properties.entrySet().stream()
+   .filter(e -> e.getKey() != null && e.getValue() != null)
+   .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + 
e.getKey(), e -> e.getValue()));
+   }
+
// -- partitions --
 
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
-   throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionAlreadyExistsException, CatalogException {
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285415666
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285415326
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285415235
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
 
 Review comment:
   Updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285415211
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285415172
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285414379
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] yanghua edited a comment on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-19 Thread GitBox
yanghua edited a comment on issue #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#issuecomment-493817814
 
 
   After fixing state evolution, Travis's result is still a failure. The reason 
is deadlock  which comes from sql test detail: 
https://travis-ci.org/apache/flink/jobs/534067083


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on issue #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-05-19 Thread GitBox
Aitozi commented on issue #8479: [FLINK-11193][State Backends]Use user passed 
configuration overriding default configuration loading from file
URL: https://github.com/apache/flink/pull/8479#issuecomment-493818308
 
 
   The travis-ci failure seems unrelated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lirui-apache commented on a change in pull request #8449: [FLINK-12235][hive] Support partition related operations in HiveCatalog

2019-05-19 Thread GitBox
lirui-apache commented on a change in pull request #8449: [FLINK-12235][hive] 
Support partition related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8449#discussion_r285413443
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -200,44 +200,56 @@ protected Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
// -- partitions --
 
@Override
-   public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
-   throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionAlreadyExistsException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, boolean ignoreIfNotExists)
-   throws PartitionNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists)
-   throws PartitionNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
-
-   @Override
-   public List listPartitions(ObjectPath tablePath)
-   throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
-   throw new UnsupportedOperationException();
+   protected void validateCatalogPartition(CatalogPartition 
catalogPartition) throws CatalogException {
+   if (!(catalogPartition instanceof HiveCatalogPartition)) {
+   throw new CatalogException(String.format("%s can only 
handle %s but got %s", getClass().getSimpleName(),
+   HiveCatalogPartition.class.getSimpleName(), 
catalogPartition.getClass().getSimpleName()));
+   }
}
 
@Override
-   public List listPartitions(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
-   throws TableNotExistException, 
TableNotPartitionedException, CatalogException {
-   throw new UnsupportedOperationException();
-   }
+   protected Partition createHivePartition(Table hiveTable, 
CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition)
+   throws PartitionSpecInvalidException {
+   Partition partition = new Partition();
+   List partCols = 
getFieldNames(hiveTable.getPartitionKeys());
+   List partValues = getFullPartitionValues(new 
ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()),
+   partitionSpec, partCols);
+   // validate partition values
+   for (int i = 0; i < partCols.size(); i++) {
+   if 
(StringUtils.isNullOrWhitespaceOnly(partValues.get(i))) {
+   throw new 
PartitionSpecInvalidException(catalogName, partCols,
+   new ObjectPath(hiveTable.getDbName(), 
hiveTable.getTableName()), partitionSpec);
+   }
+   }
+   HiveCatalogPartition hiveCatalogPartition = 
(HiveCatalogPartition) catalogPartition;
+   partition.setValues(partValues);
+   partition.setDbName(hiveTable.getDbName());
+   partition.setTableName(hiveTable.getTableName());
+   partition.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+   partition.setParameters(hiveCatalogPartition.getProperties());
+   partition.setSd(hiveTable.getSd().deepCopy());
+
+   String location = hiveCatalogPartition.getLocation();
+   if (null == location) {
 
 Review comment:
   Makes sense. I'll have a try


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] yanghua commented on issue #8322: [FLINK-12364] Introduce a CheckpointFailureManager to centralized manage checkpoint failure

2019-05-19 Thread GitBox
yanghua commented on issue #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#issuecomment-493817814
 
 
   retriggered once, result failure, reason: dead lock (sql) detail : 
https://travis-ci.org/apache/flink/jobs/534067083


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8481: Release 1.7

2019-05-19 Thread GitBox
flinkbot commented on issue #8481: Release 1.7
URL: https://github.com/apache/flink/pull/8481#issuecomment-493816547
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zehant opened a new pull request #8481: Release 1.7

2019-05-19 Thread GitBox
zehant opened a new pull request #8481: Release 1.7
URL: https://github.com/apache/flink/pull/8481
 
 
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] beyond1920 commented on a change in pull request #8462: [FLINK-12496][table-planner-blink] Support translation from StreamExecGroupWindowAggregate to StreamTransformation.

2019-05-19 Thread GitBox
beyond1920 commented on a change in pull request #8462: 
[FLINK-12496][table-planner-blink] Support translation from 
StreamExecGroupWindowAggregate to StreamTransformation.
URL: https://github.com/apache/flink/pull/8462#discussion_r285412015
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala
 ##
 @@ -695,15 +699,43 @@ object AggregateUtil extends Enumeration {
 (propPos._1, propPos._2, propPos._3)
   }
 
-  def isRowtimeIndicatorType(fieldType: TypeInformation[_]): Boolean = {
-TimeIndicatorTypeInfo.ROWTIME_INDICATOR == fieldType
+  def isRowtimeIndicatorType(fieldType: TypeInformation[_]): Boolean = 
fieldType match {
 
 Review comment:
   FlinkTypeFactory has already referenced InternalType, RelDataType, and 
provide convert conversion functions between these two types.  If we move 
`isRowtimeIndicatorType`,` isProctimeIndicatorType`, `isTimeIntervalType` and 
`isRowIntervalType` to FlinkTypeFactory, then FlinkTypeFactory will reference 
three types, which is not good. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12088) Introduce unbounded streaming inner join operator

2019-05-19 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-12088:
--

Assignee: Jark Wu  (was: Kurt Young)

> Introduce unbounded streaming inner join operator
> -
>
> Key: FLINK-12088
> URL: https://issues.apache.org/jira/browse/FLINK-12088
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Kurt Young
>Assignee: Jark Wu
>Priority: Major
>
> This operator is responsible for unbounded streaming inner join, and will be 
> optimized in following cases:
> # If the join keys (with equality condition) are also primary key, we will 
> have a more efficient state layout
> # If the inputs have primary keys, but join keys are not primary key, we can 
> also come up with an efficient state layout
> # Inputs don't have primary keys, this will go to default implementation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11943) Support TopN feature for SQL

2019-05-19 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-11943.
--
   Resolution: Duplicate
Fix Version/s: 1.9.0

> Support TopN feature for SQL
> 
>
> Key: FLINK-11943
> URL: https://issues.apache.org/jira/browse/FLINK-11943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.9.0
>
>
> TopN is a frequently used feature in data analysis. We can use ORDER BY + 
> LIMIT to easily express a TopN query, e.g. {{SELECT * FROM T ORDER BY amount 
> DESC LIMIT 10}}.
> But this is a global TopN, there is a great requirement for per-group TopN. 
> For example, top 10 shops for each category. In order to avoid introducing 
> new syntax for this, we would like to use traditional syntax to express it by 
> using {{ROW_NUMBER}} over window + {{FILTER}} to limit the numbers.
> For example:
> SELECT *
> FROM (
>   SELECT category, shopId, sales,
>  [ROW_NUMBER()|RANK()|DENSE_RANK()] OVER 
>   (PARTITION BY category ORDER BY sales ASC) as rownum
>   FROM shop_sales
> )
> WHERE rownum <= 10
> This issue is aiming to optimize this query to an {{Rank}} node instead of 
> {{Over}} plus {{Calc}}. And translate the {{Rank}} node into physical 
> operators.
> There are some optimization for rank operator based on the different input of 
> the Rank. We would like to implement the basic and one-fit-all 
> implementation. And do the performance improvement later. 
> Here is a brief design doc: 
> https://docs.google.com/document/d/14JCV6X6hcpoA51loprgntZNxQ2NmnDLucxgGY8xVDuI/edit#



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11944) Support FirstRow and LastRow for SQL

2019-05-19 Thread Kurt Young (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young closed FLINK-11944.
--
   Resolution: Duplicate
Fix Version/s: 1.9.0

> Support FirstRow and LastRow for SQL
> 
>
> Key: FLINK-11944
> URL: https://issues.apache.org/jira/browse/FLINK-11944
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.9.0
>
>
> Usually there are some duplicate data in the source due to some reasons.  In 
> order to get a correct result, we need to do deduplication. FirstRow and 
> LastRow are two different strategy for deduplication. The syntax of FirstRow 
> and LastRow is similar to TopN, but order by a time attribute. For example: 
> SELECT *
> FROM (
>   SELECT *, 
>   ROW_NUMBER() OVER (PARTITION BY id ORDER BY proctime DESC) as rownum
>   FROM T
> )
> WHERE rownum = 1
> Some information about FirstRow & LastRow.
> 1. the partition by key is the deduplicate key
> 2. can only order by a time attribute (either proctime or rowtime)
> 3. the rownum filter must be {{= 1}} or {{<= 1}}
> 4. it is FirstRow when order direction is ASC,  LastRow when order direction 
> is DESC
> This issue is aiming to optimize this query to a FirstLastRow node instead of 
> Over plus Calc. And translate the it into physical operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on issue #8477: [FLINK-12241][hive] Support function related operations in GenericHiveMetastoreCatalog

2019-05-19 Thread GitBox
bowenli86 commented on issue #8477: [FLINK-12241][hive] Support function 
related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8477#issuecomment-493807628
 
 
   Let's wait for https://github.com/apache/flink/pull/8480 to be merge first, 
then I'll refactor and rebase this PR accordingly


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285392260
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285404214
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285405796
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -192,46 +590,62 @@ protected Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
"HiveCatalog only supports HiveCatalogTable and 
HiveCatalogView");
}
 
-   hiveTable.setSd(sd);
-
return hiveTable;
}
 
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private static Map retrieveFlinkProperties(Map hiveTableParams) {
+   return hiveTableParams.entrySet().stream()
+   .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+   .collect(Collectors.toMap(e -> 
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+   }
+
+   /**
+* Add a prefix to Flink-created properties to distinguish them from 
Hive-created properties.
+*/
+   private static Map maskFlinkProperties(Map properties) {
+   return properties.entrySet().stream()
+   .filter(e -> e.getKey() != null && e.getValue() != null)
+   .collect(Collectors.toMap(e -> FLINK_PROPERTY_PREFIX + 
e.getKey(), e -> e.getValue()));
+   }
+
// -- partitions --
 
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec, CatalogPartition partition, boolean ignoreIfExists)
-   throws TableNotExistException, 
TableNotPartitionedException, PartitionSpecInvalidException, 
PartitionAlreadyExistsException, CatalogException {
 
 Review comment:
   revert tab change for partitions APIs all below


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285392064
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
 
 Review comment:
   update javadoc?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285404036
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285405982
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 ##
 @@ -48,6 +48,9 @@
  * A generic catalog implementation that holds all meta objects in memory.
  */
 public class GenericInMemoryCatalog implements Catalog {
+   public static final String FLINK_META_PROPERTY_KEY = "is_generic";
 
 Review comment:
   seems duplicate to HiveCatalog.GENERIC_META_PROPERTY_KEY. 
   
   We can either use 'is_generic' and `mask/retrieveFlinkProperties()` to deal 
with the `flink.` prefix, or just use 'flink.is_generic' after/before calls to 
`mask/retrieveFlinkProperties()`. Either way, we need to be consistent 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285403624
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285403193
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285404166
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285403774
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
-   LOG.info("Created HiveCatalog '{}'", catalogName);
+   protected final String catalogName;
+   protected final HiveConf hiveConf;
+
+   private final String defaultDatabase;
+   protected IMetaStoreClient client;
+
+   public HiveCatalog(String catalogName, String hivemetastoreURI) {
+   this(catalogName, DEFAULT_DB, getHiveConf(hivemetastoreURI));
}
 
public HiveCatalog(String catalogName, HiveConf hiveConf) {
-   super(catalogName, hiveConf);
+   this(catalogName, DEFAULT_DB, hiveConf);
+   }
+
+   public HiveCatalog(String catalogName, String defaultDatabase, HiveConf 
hiveConf) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"catalogName cannot be null or empty");
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(defaultDatabase), 
"defaultDatabase cannot be null or empty");
+   this.catalogName = catalogName;
+   this.defaultDatabase = defaultDatabase;
+   this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
 
LOG.info("Created HiveCatalog '{}'", catalogName);
}
 
+   private static HiveConf getHiveConf(String hiveMetastoreURI) {
+   
checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveMetastoreURI), 
"hiveMetastoreURI cannot be null or empty");
+
+   HiveConf hiveConf = new HiveConf();
+   hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, 
hiveMetastoreURI);
+   return hiveConf;
+   }
+
+   private static IMetaStoreClient getMetastoreClient(HiveConf hiveConf) {
+   try {
+   return RetryingMetaStoreClient.getProxy(
+   hiveConf,
+   null,
+   null,
+   HiveMetaStoreClient.class.getName(),
+   true);
+   } catch (MetaException e) {
+   throw new CatalogException("Failed to create Hive 
metastore client", e);
+   }
+   }
+
+   @Override
+   public void open() throws CatalogException {
+   if (client == null) {
+   client = getMetastoreClient(hiveConf);
+   LOG.info("Connected to Hive metastore");
+   }
+
+   if (!databaseExists(defaultDatabase)) {
+   throw new CatalogException(String.format("Configured 
default database %s doesn't exist in catalog %s.",
+   defaultDatabase, catalogName));
+   }
+   }
+
+   @Override
+   public void close() throws CatalogException {
+   if (client != null) {
+   client.close();
+   client = null;
+   LOG.info("Close connection to Hive metastore");
+   }
+   }
+
// -- databases --
 
+   public String getDefaultDatabase() throws CatalogException {
+   return defaultDatabase;
+   }
+
@Override
-   protected CatalogDatabase createCatalogDatabase(Database hiveDatabase) {
-   return new HiveCatalogDatabase(
-   hiveDatabase.getParameters(),
-   hiveDatabase.getLocationUri(),
-   hiveDatabase.getDescription());
+   public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
bowenli86 commented on a change in pull request #8480: [FLINK-12552][table]: 
Combine HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#discussion_r285403349
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 ##
 @@ -54,117 +75,494 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A catalog implementation for Hive.
+ * Base class for catalogs backed by Hive metastore.
  */
-public class HiveCatalog extends HiveCatalogBase {
+public class HiveCatalog implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalog.class);
+   private static final String DEFAULT_DB = "default";
 
-   public HiveCatalog(String catalogName, String hivemetastoreURI) {
-   super(catalogName, hivemetastoreURI);
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+   private static final String GENERC_META_PROPERTY_KEY = 
"flink.is_generic";
 
 Review comment:
   In flink we usually name the variable as same as possible to the actual 
value. The 'PROPERTY' in the name is also a bit confusing. I named the above 
key 'FLINK_PROPERTY_PREFIX' because it's appended to every property key in the 
properties, while this is a single key itself.
   
   Shall it be something like "FLINK_IS_GENERIC_KEY"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-19 Thread GitBox
asfgit closed pull request #8401: [FLINK-12407][python] Add all table operators 
align Java Table API.
URL: https://github.com/apache/flink/pull/8401
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12407) Add all table operators align Java Table API

2019-05-19 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12407.
---
Resolution: Fixed

Fixed in master: b3604f7bee7456b8533e9ea222a833a2624e36c2

> Add all table operators align Java Table API
> 
>
> Key: FLINK-12407
> URL: https://issues.apache.org/jira/browse/FLINK-12407
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: sunjincheng
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add all table operators align Java Table API. the detail can be found in 
> [FLIP-38|https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot edited a comment on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-19 Thread GitBox
flinkbot edited a comment on issue #8401: [FLINK-12407][python] Add all table 
operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#issuecomment-491201143
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @sunjincheng121 [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8401: [FLINK-12407][python] Add all table operators align Java Table API.

2019-05-19 Thread GitBox
sunjincheng121 commented on issue #8401: [FLINK-12407][python] Add all table 
operators align Java Table API.
URL: https://github.com/apache/flink/pull/8401#issuecomment-493798944
 
 
   Thanks for your updated! 
   And It makes sense to me about the doc. @WeiZhong94 
   +1 to merged.
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11051) Add Bounded(Group Window) FlatAggregate operator to streaming Table API

2019-05-19 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-11051.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: 142462fa4224c3435736ae21d32aaeb1586f1dce

> Add Bounded(Group Window) FlatAggregate operator to streaming Table API
> ---
>
> Key: FLINK-11051
> URL: https://issues.apache.org/jira/browse/FLINK-11051
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: sunjincheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add FlatAggregate operator to streaming group window Table API as described 
> in 
> [FLIP-29|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739].
> The usage:
> {code:java}
> tab.window(Tumble/Session/Slide... as 'w)
>    .groupBy('w, 'k1, 'k2)
>    .flatAggregate(tableAggregate('a))
>    .select('w.rowtime, 'k1, 'k2, 'col1, 'col2)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8359: [FLINK-11051][table] Add streaming window FlatAggregate to Table API

2019-05-19 Thread GitBox
asfgit closed pull request #8359: [FLINK-11051][table] Add streaming window 
FlatAggregate to Table API
URL: https://github.com/apache/flink/pull/8359
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12534) Reduce the test cost for Python API

2019-05-19 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12534.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in master: 063ee1bb11a580123ea0212b2c617ec6f71fc72a

> Reduce the test cost for Python API
> ---
>
> Key: FLINK-12534
> URL: https://issues.apache.org/jira/browse/FLINK-12534
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Travis
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, we add the Python API Travis test for Scala 2.12 / Java 9 / Hadoop 
> 2.4.1. due to Python API using Py4j communicate with JVM, the test for Java 9 
> is enough, and we can remove the test for Scala 2.12 and  Hadoop 2.4.1. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12534) Reduce the test cost for Python API

2019-05-19 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12534:

Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-12308

> Reduce the test cost for Python API
> ---
>
> Key: FLINK-12534
> URL: https://issues.apache.org/jira/browse/FLINK-12534
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python, Travis
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: Wei Zhong
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, we add the Python API Travis test for Scala 2.12 / Java 9 / Hadoop 
> 2.4.1. due to Python API using Py4j communicate with JVM, the test for Java 9 
> is enough, and we can remove the test for Scala 2.12 and  Hadoop 2.4.1. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API

2019-05-19 Thread GitBox
asfgit closed pull request #8465: [FLINK-12534][travis][python] Reduce the test 
cost for Python API
URL: https://github.com/apache/flink/pull/8465
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8470: [hotfix][table][docs] correct `distinct operator` doc for table API.

2019-05-19 Thread GitBox
asfgit closed pull request #8470: [hotfix][table][docs] correct `distinct 
operator` doc for table API.
URL: https://github.com/apache/flink/pull/8470
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API

2019-05-19 Thread GitBox
flinkbot edited a comment on issue #8465: [FLINK-12534][travis][python] Reduce 
the test cost for Python API
URL: https://github.com/apache/flink/pull/8465#issuecomment-493037224
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❗ 3. Needs [attention] from.
   - Needs attention by @zentol [PMC]
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @sunjincheng121 [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8465: [FLINK-12534][travis][python] Reduce the test cost for Python API

2019-05-19 Thread GitBox
sunjincheng121 commented on issue #8465: [FLINK-12534][travis][python] Reduce 
the test cost for Python API
URL: https://github.com/apache/flink/pull/8465#issuecomment-493797710
 
 
   @flinkbot approve all
   Merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8470: [hotfix][table][docs] correct `distinct operator` doc for table API.

2019-05-19 Thread GitBox
flinkbot edited a comment on issue #8470: [hotfix][table][docs] correct 
`distinct operator` doc for table API.
URL: https://github.com/apache/flink/pull/8470#issuecomment-493284070
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @sunjincheng121 [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8470: [hotfix][table][docs] correct `distinct operator` doc for table API.

2019-05-19 Thread GitBox
sunjincheng121 commented on issue #8470: [hotfix][table][docs] correct 
`distinct operator` doc for table API.
URL: https://github.com/apache/flink/pull/8470#issuecomment-493797564
 
 
   @flinkbot approve all 
   Merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
flinkbot commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog 
and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#issuecomment-493768589
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz commented on issue #8480: [FLINK-12552][table]: Combine HiveCatalog and 
GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480#issuecomment-493768604
 
 
   cc: @bowenli86 @twalthr 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz opened a new pull request #8480: [FLINK-12552][table]: Combine HiveCatalog and GenericHiveMetastoreCat…

2019-05-19 Thread GitBox
xuefuz opened a new pull request #8480: [FLINK-12552][table]: Combine 
HiveCatalog and GenericHiveMetastoreCat…
URL: https://github.com/apache/flink/pull/8480
 
 
   …alog
   
   
   
   ## What is the purpose of the change
   
   This PR is to combine GenericHiveMetastoreCatalog and HiveCatalog into one 
single class. The reasons are: 1. they both talk to Hive metastore to 
store/retrieve metadata stored in Hive and the implementation is very similar. 
2. User needs two catalogs in order to store both generic metadata and Hive 
metadata. If the two points to the same Hive metastore instance, each will see 
the same list of meta objects (such as tables) but may not understand if the 
objects are not created by itself. This creates a lot of confusion to the 
users. If the two catalog doesn't share Hive metastore, then it's a deployment 
overhead. Using a single catalog avoids both. 
   
   To differentiate the two types of meta objects in a single catalog, Generic 
meta objects will have a property named "is_generic".
   
   ## Brief change log
   
   *(for example:)*
 - Combined and merged code in GenericHiveMetastoreCatalog and 
HiveCatalogBase into HiveCatalog
 - Removed GenericHiveMetastoreCatalog and HiveCatalogBase class
 - Added abstract classes for tables/views/functions/partitions/databases 
for both type of meta objects
 - Adapted existing tests
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
GenericInMemoryCatalogTest, GenericHiveMetastoreCatalogTest, and 
HiveCatalogTest.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (No)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (No)
 - The S3 file system connector: (No)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12552) Combine HiveCatalog and GenericHiveMetastoreCatalog

2019-05-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12552:
---
Labels: pull-request-available  (was: )

> Combine HiveCatalog and GenericHiveMetastoreCatalog
> ---
>
> Key: FLINK-12552
> URL: https://issues.apache.org/jira/browse/FLINK-12552
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12552) Combine HiveCatalog and GenericHiveMetastoreCatalog

2019-05-19 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created FLINK-12552:
---

 Summary: Combine HiveCatalog and GenericHiveMetastoreCatalog
 Key: FLINK-12552
 URL: https://issues.apache.org/jira/browse/FLINK-12552
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Xuefu Zhang
Assignee: Xuefu Zhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-19 Thread GitBox
ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the 
interface of ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402#discussion_r285383592
 
 

 ##
 File path: 
flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.api.misc.persist.Persistable;
+import org.apache.flink.ml.util.param.ExtractParamInfosUtil;
+import org.apache.flink.ml.util.persist.MLStageFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for a stage in a pipeline. The interface is only a concept, and 
does not have any
+ * actual functionality. Its subclasses must be either Estimator or 
Transformer. No other classes
+ * should inherit this interface directly.
+ *
+ * Each pipeline stage is with parameters and meanwhile persistable.
+ *
+ * @param  The class type of the PipelineStage implementation itself, used 
by {@link
+ *org.apache.flink.ml.api.misc.param.WithParams}
+ * @see WithParams
+ */
+interface PipelineStage> extends WithParams, 
Serializable,
+   Persistable {
+
+   default String toJson() {
 
 Review comment:
   +1 to do it more flexible
   If we would want to support other formats for e.g. PMML for 
serialization/deserialization, then toJson/fromJson doesn't match to logic in 
this case.
   
   I suggest move serialization/deserialization implementation of logic to 
separate object from direct implementation in this interface - and inject 
serializer/deserializer to pipeline object.
   I mean next
   ```java
   class ExampleTransformer implements Transformer{
   
  public ExampleTransformer(PipelineSerializer 
implementationForSpecificFormat){
   this.serializer=implementationForSpecificFormat;
   
   }
   
  public String serialization() {// renamed version of toJson()
   return this.serializer.serialize(this);
  }
   . //some approach with deserializer
   }
   ```
   or move out serialization and deserialization methods from Pipeline objects 
at all.
   ```java
   Transformer example = new ExampleTransformer();
   //actions with ExampleTransformer
   String serializedPipeline = new 
PipelineSerializer("supported_format_name_like_json").serialize(example);
   ```
   I think this approach is more flexible for support different formats for 
serialization\deserialization pipelines and this the changes we can do in the 
next PR if Flink community will approve this proposal.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-19 Thread GitBox
ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the 
interface of ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402#discussion_r285383592
 
 

 ##
 File path: 
flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.api.misc.persist.Persistable;
+import org.apache.flink.ml.util.param.ExtractParamInfosUtil;
+import org.apache.flink.ml.util.persist.MLStageFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for a stage in a pipeline. The interface is only a concept, and 
does not have any
+ * actual functionality. Its subclasses must be either Estimator or 
Transformer. No other classes
+ * should inherit this interface directly.
+ *
+ * Each pipeline stage is with parameters and meanwhile persistable.
+ *
+ * @param  The class type of the PipelineStage implementation itself, used 
by {@link
+ *org.apache.flink.ml.api.misc.param.WithParams}
+ * @see WithParams
+ */
+interface PipelineStage> extends WithParams, 
Serializable,
+   Persistable {
+
+   default String toJson() {
 
 Review comment:
   +1 to do it more flexible
   If we would want to support other formats for e.g. PMML for 
serialization/deserialization, then toJson/fromJson doesn't match to logic in 
this case.
   
   I suggest move serialization/deserialization implementation of logic to 
separate object from direct implementation in this interface - and inject 
serializer/deserializer to pipeline object.
   I mean next
   ```java
   class ExampleTransformer implements Transformer{
   
  public ExampleTransformer(PipelineSerializer 
implementationForSpecificFormat){
   this.serializer=implementationForSpecificFormat;
   
   }
   
  public String serialization() {// renamed version of toJson()
   return this.serializer.serialize(this);
  }
   . //some approach with deserializer
   }
   ```
   or move out serialization and deserialization methods from Pipeline objects 
at all.
   ```java
   Transformer example = new ExampleTransformer();
   //actions with ExampleTransformer
   String serializedPipeline = new 
PipelineSerializer("supported_format_name_like_json").serialize(example);
   ```
   I think this approach is more flexible for support different formats for 
serialization\deserialization pipelines and this the changes we can do in the 
next PR if community will approve this propolsal.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the interface of ML pipeline and ML lib

2019-05-19 Thread GitBox
ex00 commented on a change in pull request #8402: [FLINK-12473][ml] Add the 
interface of ML pipeline and ML lib
URL: https://github.com/apache/flink/pull/8402#discussion_r285383592
 
 

 ##
 File path: 
flink-ml/flink-ml-api/src/main/java/org/apache/flink/ml/api/core/PipelineStage.java
 ##
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api.core;
+
+import org.apache.flink.ml.api.misc.param.ParamInfo;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.api.misc.persist.Persistable;
+import org.apache.flink.ml.util.param.ExtractParamInfosUtil;
+import org.apache.flink.ml.util.persist.MLStageFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class for a stage in a pipeline. The interface is only a concept, and 
does not have any
+ * actual functionality. Its subclasses must be either Estimator or 
Transformer. No other classes
+ * should inherit this interface directly.
+ *
+ * Each pipeline stage is with parameters and meanwhile persistable.
+ *
+ * @param  The class type of the PipelineStage implementation itself, used 
by {@link
+ *org.apache.flink.ml.api.misc.param.WithParams}
+ * @see WithParams
+ */
+interface PipelineStage> extends WithParams, 
Serializable,
+   Persistable {
+
+   default String toJson() {
 
 Review comment:
   +1 to rename methods for serialization/deserialization model stages.
   If we would want to support other formats for e.g. PMML for 
serialization/deserialization, then toJson/fromJson doesn't match to logic in 
this case.
   
   I suggest move serialization/deserialization implementation of logic to 
separate object from direct implementation in this interface - and inject 
serializer/deserializer to pipeline object.
   I mean next
   ```java
   class ExampleTransformer implements Transformer{
   
  public ExampleTransformer(PipelineSerializer 
implementationForSpecificFormat){
   this.serializer=implementationForSpecificFormat;
   
   }
   
  public String serialization() {// renamed version of toJson()
   return this.serializer.serialize(this);
  }
   . //some approach with deserializer
   }
   ```
   or move out serialization and deserialization methods from Pipeline objects 
at all.
   ```java
   Transformer example = new ExampleTransformer();
   //actions with ExampleTransformer
   String serializedPipeline = new 
PipelineSerializer("supported_format_name_like_json").serialize(example);
   ```
   I think this approach is more flexible for support different formats for 
serialization\deserialization pipelines and this the changes we can do in the 
next PR if community will approve this propolsal.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle the wrapped function

2019-05-19 Thread GitBox
Aitozi commented on issue #8280: [FLINK-12297]Harden ClosureCleaner to handle 
the wrapped function
URL: https://github.com/apache/flink/pull/8280#issuecomment-493753651
 
 
   ping @StephanEwen 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi commented on issue #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-05-19 Thread GitBox
Aitozi commented on issue #8479: [FLINK-11193][State Backends]Use user passed 
configuration overriding default configuration loading from file
URL: https://github.com/apache/flink/pull/8479#issuecomment-493753561
 
 
   Hi, @StefanRRichter can you help take a look on this PR when you have time, 
thx.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-05-19 Thread GitBox
flinkbot commented on issue #8479: [FLINK-11193][State Backends]Use user passed 
configuration overriding default configuration loading from file
URL: https://github.com/apache/flink/pull/8479#issuecomment-493753515
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Aitozi opened a new pull request #8479: [FLINK-11193][State Backends]Use user passed configuration overriding default configuration loading from file

2019-05-19 Thread GitBox
Aitozi opened a new pull request #8479: [FLINK-11193][State Backends]Use user 
passed configuration overriding default configuration loading from file
URL: https://github.com/apache/flink/pull/8479
 
 
   ## What is the purpose of the change
   
   As described in the 
[jira](https://issues.apache.org/jira/browse/FLINK-11193), User's customize 
configuration which is configured by `backend.configure()` method will be 
override by the configuration loading from flink-conf.yaml. I think the config 
in the code should has a higher priority than the default file configuration.
   
   ## Brief change log
   
   Merge configuration before create the new state backend.
   
   
   ## Verifying this change
   
   Adding a test case to verify.
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12550) hostnames with a dot never receive local input splits

2019-05-19 Thread Felix seibert (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397
 ] 

Felix seibert edited comment on FLINK-12550 at 5/19/19 11:25 AM:
-

After openining PR #8478 yesterday, I have some additional considerations.

The status quo is the following:

To check if an input split is locally available for a taskmanager, the hostname 
of the taskmanager is compared to the hostname of the input split. This happens 
in this 
[line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]:[link
 title|http://example.com]
{code:java}
if (h != null && 
NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code}
h is the hostname of a machine hosting the input split, flinkHost is the 
taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() 
truncates at the first occurrance of a ".". So, if a split is present on 
"host.domain", and the hostname of the taskmanager is "host.domain" too, we 
actually check whether "host".equals("host.domain") which is not true. PR #8478 
applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems 
that this problem is fixed.

 

BUT. What if there is a taskmanager on host "host.cluster1.domain", and an 
input split on host "host.cluster2.domain"? isLocal() would recognize this 
split as being on the same host as the taskmanager, which is clearly not the 
case.

So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of 
the two compared hostnames.

Or is there any reason why it should be applied?

 


was (Author: felxe):
After openining PR #8478 yesterday, I have some additional considerations.

The status quo is the following:

To check if an input split is locally available for a taskmanager, the hostname 
of the taskmanager is compared to the hostname of the input split. This happens 
in this 
[line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]:
 
{code:java}
if (h != null && 
NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code}
h is the hostname of a machine hosting the input split, flinkHost is the 
taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() 
truncates at the first occurrance of a ".". So, if a split is present on 
"host.domain", and the hostname of the taskmanager is "host.domain" too, we 
actually check whether "host".equals("host.domain") which is not true. PR #8478 
applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems 
that this problem is fixed.

 

BUT. What if there is a taskmanager on host "host.cluster1.domain", and an 
input split on host "host.cluster2.domain"? isLocal() would recognize this 
split as being on the same host as the taskmanager, which is clearly not the 
case.

So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of 
the two compared hostnames.

Or is there any reason why it should be applied?

 

> hostnames with a dot never receive local input splits
> -
>
> Key: FLINK-12550
> URL: https://issues.apache.org/jira/browse/FLINK-12550
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet
>Affects Versions: 1.8.0
>Reporter: Felix seibert
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
> input splits to hosts whose hostname contains a dot ("."). To reproduce add 
> the following test to LocatableSplitAssignerTest and execute it. It will 
> always fail. In my mind, this is contrary to the expected behaviour, which is 
> that the host should obtain the one split that is stored on the very same 
> machine.
>  
> {code:java}
> @Test
> public void testLocalSplitAssignmentForHostWithDomainName() {
>try {
>   String hostNameWithDot = "testhost.testdomain";
>   // load one split
>   Set splits = new HashSet();
>   splits.add(new LocatableInputSplit(0, hostNameWithDot));
>   // get next split for the host
>   LocatableInputSplitAssigner ia = new 
> LocatableInputSplitAssigner(splits);
>   InputSplit is = null;
>   ia.getNextInputSplit(hostNameWithDot, 0);
>   // there should be exactly zero remote and one local assignment
>   assertEquals(0, ia.getNumberOfRemoteAssignments());
>   assertEquals(1, ia.getNumberOfLocalAssignments());
>}
>catch (Exception e) {
>   e.printStackTrace();
>   fail(e.getMessage());
>}
> }
> {code}
> I also 

[jira] [Comment Edited] (FLINK-12550) hostnames with a dot never receive local input splits

2019-05-19 Thread Felix seibert (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397
 ] 

Felix seibert edited comment on FLINK-12550 at 5/19/19 11:25 AM:
-

After openining PR #8478 yesterday, I have some additional considerations.

The status quo is the following:

To check if an input split is locally available for a taskmanager, the hostname 
of the taskmanager is compared to the hostname of the input split. This happens 
in this line 
[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]:
{code:java}
if (h != null && 
NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code}
h is the hostname of a machine hosting the input split, flinkHost is the 
taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() 
truncates at the first occurrance of a ".". So, if a split is present on 
"host.domain", and the hostname of the taskmanager is "host.domain" too, we 
actually check whether "host".equals("host.domain") which is not true. PR #8478 
applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems 
that this problem is fixed.

 

BUT. What if there is a taskmanager on host "host.cluster1.domain", and an 
input split on host "host.cluster2.domain"? isLocal() would recognize this 
split as being on the same host as the taskmanager, which is clearly not the 
case.

So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of 
the two compared hostnames.

Or is there any reason why it should be applied?

 


was (Author: felxe):
After openining PR #8478 yesterday, I have some additional considerations.

The status quo is the following:

To check if an input split is locally available for a taskmanager, the hostname 
of the taskmanager is compared to the hostname of the input split. This happens 
in this 
[line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]:[link
 title|http://example.com]
{code:java}
if (h != null && 
NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code}
h is the hostname of a machine hosting the input split, flinkHost is the 
taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() 
truncates at the first occurrance of a ".". So, if a split is present on 
"host.domain", and the hostname of the taskmanager is "host.domain" too, we 
actually check whether "host".equals("host.domain") which is not true. PR #8478 
applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems 
that this problem is fixed.

 

BUT. What if there is a taskmanager on host "host.cluster1.domain", and an 
input split on host "host.cluster2.domain"? isLocal() would recognize this 
split as being on the same host as the taskmanager, which is clearly not the 
case.

So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of 
the two compared hostnames.

Or is there any reason why it should be applied?

 

> hostnames with a dot never receive local input splits
> -
>
> Key: FLINK-12550
> URL: https://issues.apache.org/jira/browse/FLINK-12550
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet
>Affects Versions: 1.8.0
>Reporter: Felix seibert
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
> input splits to hosts whose hostname contains a dot ("."). To reproduce add 
> the following test to LocatableSplitAssignerTest and execute it. It will 
> always fail. In my mind, this is contrary to the expected behaviour, which is 
> that the host should obtain the one split that is stored on the very same 
> machine.
>  
> {code:java}
> @Test
> public void testLocalSplitAssignmentForHostWithDomainName() {
>try {
>   String hostNameWithDot = "testhost.testdomain";
>   // load one split
>   Set splits = new HashSet();
>   splits.add(new LocatableInputSplit(0, hostNameWithDot));
>   // get next split for the host
>   LocatableInputSplitAssigner ia = new 
> LocatableInputSplitAssigner(splits);
>   InputSplit is = null;
>   ia.getNextInputSplit(hostNameWithDot, 0);
>   // there should be exactly zero remote and one local assignment
>   assertEquals(0, ia.getNumberOfRemoteAssignments());
>   assertEquals(1, ia.getNumberOfLocalAssignments());
>}
>catch (Exception e) {
>   e.printStackTrace();
>   fail(e.getMessage());
>}
> }
> {code}
> I also experienced 

[jira] [Comment Edited] (FLINK-12550) hostnames with a dot never receive local input splits

2019-05-19 Thread Felix seibert (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397
 ] 

Felix seibert edited comment on FLINK-12550 at 5/19/19 11:24 AM:
-

After openining PR #8478 yesterday, I have some additional considerations.

The status quo is the following:

To check if an input split is locally available for a taskmanager, the hostname 
of the taskmanager is compared to the hostname of the input split. This happens 
in this 
[line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]:
 
{code:java}
if (h != null && 
NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code}
h is the hostname of a machine hosting the input split, flinkHost is the 
taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() 
truncates at the first occurrance of a ".". So, if a split is present on 
"host.domain", and the hostname of the taskmanager is "host.domain" too, we 
actually check whether "host".equals("host.domain") which is not true. PR #8478 
applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems 
that this problem is fixed.

 

BUT. What if there is a taskmanager on host "host.cluster1.domain", and an 
input split on host "host.cluster2.domain"? isLocal() would recognize this 
split as being on the same host as the taskmanager, which is clearly not the 
case.

So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of 
the two compared hostnames.

Or is there any reason why it should be applied?

 


was (Author: felxe):
After openining PR #8478 yesterday, I have some additional considerations.

The status quo is the following:

To check if an input split is locally available for a taskmanager, the hostname 
of the taskmanager is compared to the hostname of the input split. This happens 
in [this 
line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]:

 
{code:java}
if (h != null && 
NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code}
h is the hostname of a machine hosting the input split, flinkHost is the 
taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() 
truncates at the first occurrance of a ".". So, if a split is present on 
"host.domain", and the hostname of the taskmanager is "host.domain" too, we 
actually check whether "host".equals("host.domain") which is not true. PR #8478 
applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems 
that this problem is fixed.

 

BUT. What if there is a taskmanager on host "host.cluster1.domain", and an 
input split on host "host.cluster2.domain"? isLocal() would recognize this 
split as being on the same host as the taskmanager, which is clearly not the 
case.

So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of 
the two compared hostnames.

Or is there any reason why it should be applied?

 

> hostnames with a dot never receive local input splits
> -
>
> Key: FLINK-12550
> URL: https://issues.apache.org/jira/browse/FLINK-12550
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet
>Affects Versions: 1.8.0
>Reporter: Felix seibert
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
> input splits to hosts whose hostname contains a dot ("."). To reproduce add 
> the following test to LocatableSplitAssignerTest and execute it. It will 
> always fail. In my mind, this is contrary to the expected behaviour, which is 
> that the host should obtain the one split that is stored on the very same 
> machine.
>  
> {code:java}
> @Test
> public void testLocalSplitAssignmentForHostWithDomainName() {
>try {
>   String hostNameWithDot = "testhost.testdomain";
>   // load one split
>   Set splits = new HashSet();
>   splits.add(new LocatableInputSplit(0, hostNameWithDot));
>   // get next split for the host
>   LocatableInputSplitAssigner ia = new 
> LocatableInputSplitAssigner(splits);
>   InputSplit is = null;
>   ia.getNextInputSplit(hostNameWithDot, 0);
>   // there should be exactly zero remote and one local assignment
>   assertEquals(0, ia.getNumberOfRemoteAssignments());
>   assertEquals(1, ia.getNumberOfLocalAssignments());
>}
>catch (Exception e) {
>   e.printStackTrace();
>   fail(e.getMessage());
>}
> }
> {code}
> I also experienced this error in practice, and 

[jira] [Comment Edited] (FLINK-12550) hostnames with a dot never receive local input splits

2019-05-19 Thread Felix seibert (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397
 ] 

Felix seibert edited comment on FLINK-12550 at 5/19/19 11:23 AM:
-

After openining PR #8478 yesterday, I have some additional considerations.

The status quo is the following:

To check if an input split is locally available for a taskmanager, the hostname 
of the taskmanager is compared to the hostname of the input split. This happens 
in [this 
line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]]:

 
{code:java}
if (h != null && 
NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code}
h is the hostname of a machine hosting the input split, flinkHost is the 
taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() 
truncates at the first occurrance of a ".". So, if a split is present on 
"host.domain", and the hostname of the taskmanager is "host.domain" too, we 
actually check whether "host".equals("host.domain") which is not true. PR #8478 
applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems 
that this problem is fixed.

 

BUT. What if there is a taskmanager on host "host.cluster1.domain", and an 
input split on host "host.cluster2.domain"? isLocal() would recognize this 
split as being on the same host as the taskmanager, which is clearly not the 
case.

So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of 
the two compared hostnames.

Or is there any reason why it should be applied?

 


was (Author: felxe):
After openining PR #8478 yesterday, I have some additional considerations.

The status quo is the following:

To check if an input split is locally available for a taskmanager, the hostname 
of the taskmanager is compared to the hostname of the input split. This happens 
in [this 
line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]:]

 
{code:java}
if (h != null && 
NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code}
h is the hostname of a machine hosting the input split, flinkHost is the 
taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() 
truncates at the first occurrance of a ".". So, if a split is present on 
"host.domain", and the hostname of the taskmanager is "host.domain" too, we 
actually check whether "host".equals("host.domain") which is not true. PR #8478 
applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems 
that this problem is fixed.

 

BUT. What if there is a taskmanager on host "host.cluster1.domain", and an 
input split on host "host.cluster2.domain"? isLocal() would recognize this 
split as being on the same host as the taskmanager, which is clearly not the 
case.

So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of 
the two compared hostnames.

Or is there any reason why it should be applied?

 

> hostnames with a dot never receive local input splits
> -
>
> Key: FLINK-12550
> URL: https://issues.apache.org/jira/browse/FLINK-12550
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet
>Affects Versions: 1.8.0
>Reporter: Felix seibert
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
> input splits to hosts whose hostname contains a dot ("."). To reproduce add 
> the following test to LocatableSplitAssignerTest and execute it. It will 
> always fail. In my mind, this is contrary to the expected behaviour, which is 
> that the host should obtain the one split that is stored on the very same 
> machine.
>  
> {code:java}
> @Test
> public void testLocalSplitAssignmentForHostWithDomainName() {
>try {
>   String hostNameWithDot = "testhost.testdomain";
>   // load one split
>   Set splits = new HashSet();
>   splits.add(new LocatableInputSplit(0, hostNameWithDot));
>   // get next split for the host
>   LocatableInputSplitAssigner ia = new 
> LocatableInputSplitAssigner(splits);
>   InputSplit is = null;
>   ia.getNextInputSplit(hostNameWithDot, 0);
>   // there should be exactly zero remote and one local assignment
>   assertEquals(0, ia.getNumberOfRemoteAssignments());
>   assertEquals(1, ia.getNumberOfLocalAssignments());
>}
>catch (Exception e) {
>   e.printStackTrace();
>   fail(e.getMessage());
>}
> }
> {code}
> I also experienced this error in practice, 

[jira] [Comment Edited] (FLINK-12550) hostnames with a dot never receive local input splits

2019-05-19 Thread Felix seibert (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397
 ] 

Felix seibert edited comment on FLINK-12550 at 5/19/19 11:23 AM:
-

After openining PR #8478 yesterday, I have some additional considerations.

The status quo is the following:

To check if an input split is locally available for a taskmanager, the hostname 
of the taskmanager is compared to the hostname of the input split. This happens 
in [this 
line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]:]

 
{code:java}
if (h != null && 
NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code}
h is the hostname of a machine hosting the input split, flinkHost is the 
taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() 
truncates at the first occurrance of a ".". So, if a split is present on 
"host.domain", and the hostname of the taskmanager is "host.domain" too, we 
actually check whether "host".equals("host.domain") which is not true. PR #8478 
applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems 
that this problem is fixed.

 

BUT. What if there is a taskmanager on host "host.cluster1.domain", and an 
input split on host "host.cluster2.domain"? isLocal() would recognize this 
split as being on the same host as the taskmanager, which is clearly not the 
case.

So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of 
the two compared hostnames.

Or is there any reason why it should be applied?

 


was (Author: felxe):
After openining PR #8478 yesterday, I have some additional considerations.

The status quo is the following:
 * To check if an input split is locally available for a taskmanager, the 
hostname of the taskmanager is compared to the hostname of the input split. 
This happens in [this 
line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]:]

 
{code:java}
if (h != null && 
NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code}
h is the hostname of a machine hosting the input split, flinkHost is the 
taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() 
truncates at the first occurrance of a ".". So, if a split is present on 
"host.domain", and the hostname of the taskmanager is "host.domain" too, we 
actually check whether "host".equals("host.domain") which is not true. PR #8478 
applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems 
that this problem is fixed.

 

BUT. What if there is a taskmanager on host "host.cluster1.domain", and an 
input split on host "host.cluster2.domain"? isLocal() would recognize this 
split as being on the same host as the taskmanager, which is clearly not the 
case.

So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of 
the two compared hostnames.

Or is there any reason why it should be applied?

 

> hostnames with a dot never receive local input splits
> -
>
> Key: FLINK-12550
> URL: https://issues.apache.org/jira/browse/FLINK-12550
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet
>Affects Versions: 1.8.0
>Reporter: Felix seibert
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
> input splits to hosts whose hostname contains a dot ("."). To reproduce add 
> the following test to LocatableSplitAssignerTest and execute it. It will 
> always fail. In my mind, this is contrary to the expected behaviour, which is 
> that the host should obtain the one split that is stored on the very same 
> machine.
>  
> {code:java}
> @Test
> public void testLocalSplitAssignmentForHostWithDomainName() {
>try {
>   String hostNameWithDot = "testhost.testdomain";
>   // load one split
>   Set splits = new HashSet();
>   splits.add(new LocatableInputSplit(0, hostNameWithDot));
>   // get next split for the host
>   LocatableInputSplitAssigner ia = new 
> LocatableInputSplitAssigner(splits);
>   InputSplit is = null;
>   ia.getNextInputSplit(hostNameWithDot, 0);
>   // there should be exactly zero remote and one local assignment
>   assertEquals(0, ia.getNumberOfRemoteAssignments());
>   assertEquals(1, ia.getNumberOfLocalAssignments());
>}
>catch (Exception e) {
>   e.printStackTrace();
>   fail(e.getMessage());
>}
> }
> {code}
> I also experienced this error in practice, 

[jira] [Commented] (FLINK-12550) hostnames with a dot never receive local input splits

2019-05-19 Thread Felix seibert (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843397#comment-16843397
 ] 

Felix seibert commented on FLINK-12550:
---

After openining PR #8478 yesterday, I have some additional considerations.

The status quo is the following:
 * To check if an input split is locally available for a taskmanager, the 
hostname of the taskmanager is compared to the hostname of the input split. 
This happens in [this 
line|[https://github.com/apache/flink/blob/4fa387164cea44f8e0bac1aadab11433c0f0ff2b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java#L223]:]

 
{code:java}
if (h != null && 
NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)){code}
h is the hostname of a machine hosting the input split, flinkHost is the 
taskmanager that is looking for an input split. NetUtils.getHostnameFromFQDN() 
truncates at the first occurrance of a ".". So, if a split is present on 
"host.domain", and the hostname of the taskmanager is "host.domain" too, we 
actually check whether "host".equals("host.domain") which is not true. PR #8478 
applies getHostnameFromFQDN() on the taskmanager hostname as well, so it seems 
that this problem is fixed.

 

BUT. What if there is a taskmanager on host "host.cluster1.domain", and an 
input split on host "host.cluster2.domain"? isLocal() would recognize this 
split as being on the same host as the taskmanager, which is clearly not the 
case.

So to me it looks like getHostNameFromFQDN() shouldn't be applied on neither of 
the two compared hostnames.

Or is there any reason why it should be applied?

 

> hostnames with a dot never receive local input splits
> -
>
> Key: FLINK-12550
> URL: https://issues.apache.org/jira/browse/FLINK-12550
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet
>Affects Versions: 1.8.0
>Reporter: Felix seibert
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
> input splits to hosts whose hostname contains a dot ("."). To reproduce add 
> the following test to LocatableSplitAssignerTest and execute it. It will 
> always fail. In my mind, this is contrary to the expected behaviour, which is 
> that the host should obtain the one split that is stored on the very same 
> machine.
>  
> {code:java}
> @Test
> public void testLocalSplitAssignmentForHostWithDomainName() {
>try {
>   String hostNameWithDot = "testhost.testdomain";
>   // load one split
>   Set splits = new HashSet();
>   splits.add(new LocatableInputSplit(0, hostNameWithDot));
>   // get next split for the host
>   LocatableInputSplitAssigner ia = new 
> LocatableInputSplitAssigner(splits);
>   InputSplit is = null;
>   ia.getNextInputSplit(hostNameWithDot, 0);
>   // there should be exactly zero remote and one local assignment
>   assertEquals(0, ia.getNumberOfRemoteAssignments());
>   assertEquals(1, ia.getNumberOfLocalAssignments());
>}
>catch (Exception e) {
>   e.printStackTrace();
>   fail(e.getMessage());
>}
> }
> {code}
> I also experienced this error in practice, and will later today open a pull 
> request to fix it.
>  
> Note: I'm not sure if I selected the correct component category.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12550) hostnames with a dot never receive local input splits

2019-05-19 Thread Felix seibert (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Felix seibert updated FLINK-12550:
--
Description: 
LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
input splits to hosts whose hostname contains a dot ("."). To reproduce add the 
following test to LocatableSplitAssignerTest and execute it. It will always 
fail. In my mind, this is contrary to the expected behaviour, which is that the 
host should obtain the one split that is stored on the very same machine.

 
{code:java}
@Test
public void testLocalSplitAssignmentForHostWithDomainName() {
   try {
  String hostNameWithDot = "testhost.testdomain";

  // load one split
  Set splits = new HashSet();
  splits.add(new LocatableInputSplit(0, hostNameWithDot));

  // get next split for the host
  LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
  InputSplit is = null;
  ia.getNextInputSplit(hostNameWithDot, 0);

  // there should be exactly zero remote and one local assignment
  assertEquals(0, ia.getNumberOfRemoteAssignments());
  assertEquals(1, ia.getNumberOfLocalAssignments());
   }
   catch (Exception e) {
  e.printStackTrace();
  fail(e.getMessage());
   }
}
{code}
I also experienced this error in practice, and will later today open a pull 
request to fix it.

 

Note: I'm not sure if I selected the correct component category.

 

  was:
LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
input splits to hosts whose hostname contains a dot ("."). To reproduce add the 
following test to LocatableSplitAssignerTest and execute it. It will always 
fail. In my mind, this is contrary to the expected behaviour, which is that the 
host should obtain the one split that is stored on the very same machine.

 
{code:java}
@Test
public void testLocalSplitAssignmentForHostWithDomainName() {
   try {
  String hostNameWithDot = "testhost.testdomain";

  // load one split
  Set splits = new HashSet();
  splits.add(new LocatableInputSplit(0, hostNameWithDot));

  // get all available splits
  LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits);
  InputSplit is = null;
  ia.getNextInputSplit(hostNameWithDot, 0);

  assertEquals(0, ia.getNumberOfRemoteAssignments());
  assertEquals(1, ia.getNumberOfLocalAssignments());
   }
   catch (Exception e) {
  e.printStackTrace();
  fail(e.getMessage());
   }
}
{code}
I also experienced this error in practice, and will later today open a pull 
request to fix it.

 

Note: I'm not sure if I selected the correct component category.

 


> hostnames with a dot never receive local input splits
> -
>
> Key: FLINK-12550
> URL: https://issues.apache.org/jira/browse/FLINK-12550
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataSet
>Affects Versions: 1.8.0
>Reporter: Felix seibert
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> LocatableInputSplitAssigner (in package api.common.io) fails to assign local 
> input splits to hosts whose hostname contains a dot ("."). To reproduce add 
> the following test to LocatableSplitAssignerTest and execute it. It will 
> always fail. In my mind, this is contrary to the expected behaviour, which is 
> that the host should obtain the one split that is stored on the very same 
> machine.
>  
> {code:java}
> @Test
> public void testLocalSplitAssignmentForHostWithDomainName() {
>try {
>   String hostNameWithDot = "testhost.testdomain";
>   // load one split
>   Set splits = new HashSet();
>   splits.add(new LocatableInputSplit(0, hostNameWithDot));
>   // get next split for the host
>   LocatableInputSplitAssigner ia = new 
> LocatableInputSplitAssigner(splits);
>   InputSplit is = null;
>   ia.getNextInputSplit(hostNameWithDot, 0);
>   // there should be exactly zero remote and one local assignment
>   assertEquals(0, ia.getNumberOfRemoteAssignments());
>   assertEquals(1, ia.getNumberOfLocalAssignments());
>}
>catch (Exception e) {
>   e.printStackTrace();
>   fail(e.getMessage());
>}
> }
> {code}
> I also experienced this error in practice, and will later today open a pull 
> request to fix it.
>  
> Note: I'm not sure if I selected the correct component category.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig

2019-05-19 Thread GitBox
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State 
TTL] Consider setting a default background cleanup strategy in StateTtlConfig
URL: https://github.com/apache/flink/pull/8459#discussion_r285374038
 
 

 ##
 File path: docs/dev/stream/state/state.md
 ##
 @@ -400,6 +400,17 @@ This option is not applicable for the incremental 
checkpointing in the RocksDB s
 - For existing jobs, this cleanup strategy can be activated or deactivated 
anytime in `StateTtlConfig`, 
 e.g. after restart from savepoint.
 
+ Cleanup in background
+
+Besides cleanup in full snapshot, you also can activate the cleanup in 
background. The option will adapt
+all the cleanup strategies with default values. Users who do not need to think 
about details or used backend.
+Currently, Flink provide two kind of cleanup strategies in background and the 
default config value list below:
+
+* Incremental cleanup(cleanup size: 10, run cleanup for every record: true)
+* Cleanup during RocksDB compaction(query time after how many entries: 1000)
+
+If users who want to get more fine-grained control about the special cleanup 
in background can use it directly.
+
 
 Review comment:
   We also need a code example here:
   ```
   Besides cleanup in full snapshot, you can also activate the cleanup in 
background. This option will activate
   a strategy configured by default if the background cleanup is supported for 
the used backend. 
   
   This feature can be activated in `StateTtlConfig`:
   << ... code example is needed like in other chapters  >
   
   For more fine-grained control over some special cleanup in background, you 
can configure it separately as described below.
   ```
   
   This should go to the respective chapters where the parameter values are 
discussed:
   ```
* Incremental cleanup(cleanup size: 10, run cleanup for every record: true)
* Cleanup during RocksDB compaction(query time after how many entries: 1000)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8459: [FLINK-12476] [State TTL] Consider setting a default background cleanup strategy in StateTtlConfig

2019-05-19 Thread GitBox
azagrebin commented on a change in pull request #8459: [FLINK-12476] [State 
TTL] Consider setting a default background cleanup strategy in StateTtlConfig
URL: https://github.com/apache/flink/pull/8459#discussion_r285373468
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ##
 @@ -202,6 +204,15 @@ private IS createReducingState() throws Exception {
}
 
private TtlIncrementalCleanup getTtlIncrementalCleanup() {
+   //if cleanup in background and user did not specify the cleanup 
strategy,
+   //then build the incremental cleanup strategy and apply the 
default value
+   if (ttlConfig.isCleanupInBackground() &&
+   
ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy() == null) {
+   ttlConfig.getCleanupStrategies().activate(
 
 Review comment:
   I would be against modifying `ttlConfig` in any way. It is supposed to be 
immutable, once created by its builder. This protects against implicit changes 
of `ttlConfig` which is created by user. We should actually make 
`CleanupStrategies.strategies/activate` private or better even move 
`CleanupStrategies.strategies` to builder (instead of having there 
`CleanupStrategies`) and remove `CleanupStrategies.activate`. The builder 
should then also build immutable `CleanupStrategies` for `StateTtlConfig` at 
the end.
   
   If we move `isCleanupInBackground` to `CleanupStrategies` with its getter 
(where it rather belongs anyways) we can actually just modify 
`CleanupStrategies.getIncrementalCleanupStrategy/getRocksdbCompactFilterCleanupStrategy`
 to return default strategies as `strategies.getOrDefault` if they are not set 
but `isCleanupInBackground` is set. The default strategy singletons can be 
static final constants in respective classes. The backends and private field 
modifiers can also stay untouched but any strategy and backend can still 
independently decide what to use if `isCleanupInBackground` is set. 
`CleanupStrategies.inFullSnapshot/inRocksdbCompactFilter` should use the 
respective getters != null then, not `CleanupStrategies.strategies` directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12299) ExecutionConfig#setAutoWatermarkInterval should check param(interval should not less than zero)

2019-05-19 Thread Andrey Zagrebin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin updated FLINK-12299:

Component/s: (was: Runtime / Coordination)
 Runtime / Operators
 API / DataStream

> ExecutionConfig#setAutoWatermarkInterval should check param(interval should 
> not less than zero)
> ---
>
> Key: FLINK-12299
> URL: https://issues.apache.org/jira/browse/FLINK-12299
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Operators
>Reporter: shiwuliang
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In any scenario, `autoWatermarkInterval` should not be less than or equal to 
> zero.
> First of all, this does not correspond to the meaning of 
> `autoWatermarkInterval`.
> Second, in the case where `autoWatermarkInterval` is less than 0, we will not 
> be able to register ourselves in 
> `TimestampsAndPeriodicWatermarksOperator#open`, which will result in the 
> water level of this stream being kept at the lowest level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)