[GitHub] [flink] twalthr commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system

2019-05-08 Thread GitBox
twalthr commented on a change in pull request #8360: 
[FLINK-12393][table-common] Add the user-facing classes of the new type system
URL: https://github.com/apache/flink/pull/8360#discussion_r282347087
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
 ##
 @@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Describes the data type of a value in the table ecosystem. Instances of 
this class can be used to
+ * declare input and/or output types of operations.
+ *
+ * The {@link DataType} class has two responsibilities: declaring a logical 
type and giving hints
+ * about the physical representation of data to the optimizer. While the 
logical type is mandatory,
+ * hints are optional but useful at the edges to other APIs.
+ *
+ * The logical type is independent of any physical representation and is 
close to the "data type"
+ * terminology of the SQL standard. See {@link 
org.apache.flink.table.types.logical.LogicalType} and
+ * its subclasses for more information about available logical types and their 
properties.
+ *
+ * Physical hints are required at the edges of the table ecosystem. Hints 
indicate the data format
+ * that an implementation expects. For example, a data source could express 
that it produces values for
+ * logical timestamps using a {@link java.sql.Timestamp} class instead of 
using {@link java.time.LocalDateTime}.
+ * With this information, the runtime is able to convert the produced class 
into its internal data
+ * format. In return, a data sink can declare the data format it consumes from 
the runtime.
+ *
+ * @see DataTypes for a list of supported data types and instances of this 
class.
+ */
+@PublicEvolving
+public abstract class DataType implements Serializable {
+
+   protected LogicalType logicalType;
+
+   protected @Nullable Class conversionClass;
+
+   private DataType(LogicalType logicalType, @Nullable Class 
conversionClass) {
+   this.logicalType = Preconditions.checkNotNull(logicalType, 
"Logical type must not be null.");
+   this.conversionClass = performEarlyClassValidation(logicalType, 
conversionClass);
+   }
+
+   /**
+* Returns the corresponding logical type.
+*
+* @return a parameterized instance of {@link LogicalType}
+*/
+   public LogicalType getLogicalType() {
+   return logicalType;
+   }
+
+   /**
+* Returns the corresponding conversion class for representing values. 
If no conversion class was
+* defined manually, the default conversion defined by the logical type 
is used.
+*
+* @see LogicalType#getDefaultConversion()
+*
+* @return the expected conversion class
+*/
+   public Class getConversionClass() {
+   if (conversionClass == null) {
+   return logicalType.getDefaultConversion();
+   }
+   return conversionClass;
+   }
+
+   /**
+* Adds a hint that null values are not expected in the data for this 
type.
+*
+* @return a new, reconfigured data type instance
+*/
+   public abstract DataType notNull();
+
+   /**
+* Adds a hint that null values are expected in the data for this type 
(default behavior).
+*
+* This method exists for explicit declaration of the default 
behavior or for invalidation of
+* a previous call to {@link #notNull()}.
+*
+* @return a new, reconfigured data type instance
+*/
+   public abstract DataType andNull();
+
+   /**
+* Adds a hint that data should be represented using 

[jira] [Assigned] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager

2019-05-08 Thread zhangminglei (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangminglei reassigned FLINK-9410:
---

Assignee: zhangminglei  (was: lihongli)

> Replace NMClient with NMClientAsync in YarnResourceManager
> --
>
> Key: FLINK-9410
> URL: https://issues.apache.org/jira/browse/FLINK-9410
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: zhangminglei
>Priority: Critical
>
> Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} 
> which is called from within the main thread of the {{ResourceManager}}. Since 
> these operations are blocking, we should replace the client with the 
> {{NMClientAsync}} and make the calls non blocking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12456) Wrong CharType convertion in type_utils.py

2019-05-08 Thread Jingsong Lee (JIRA)
Jingsong Lee created FLINK-12456:


 Summary: Wrong CharType convertion in type_utils.py
 Key: FLINK-12456
 URL: https://issues.apache.org/jira/browse/FLINK-12456
 Project: Flink
  Issue Type: Bug
Reporter: Jingsong Lee


In types.py: define CharType as SQL CHAR, but SQL CHAR is a Java String instead 
of Java Character.

In type_utils.py, 

org.apache.flink.api.common.typeinfo.Types.CHAR map to DataTypes.CHAR, it is 
wrong. Types.CHAR is Java Character.

I suggest that consider removing Char's support first.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on issue #8376: [FLINK-12392] [table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12

2019-05-08 Thread GitBox
wuchong commented on issue #8376: [FLINK-12392] [table-planner-blink] Port 
FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12
URL: https://github.com/apache/flink/pull/8376#issuecomment-490752948
 
 
   Thanks for digging into this. 
   
   LGTM.
   
   Will merge after travis turns green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8377: [FLINK-12455][python] Move the packaging of pyflink to flink-dist

2019-05-08 Thread GitBox
flinkbot commented on issue #8377: [FLINK-12455][python] Move the packaging of 
pyflink to flink-dist
URL: https://github.com/apache/flink/pull/8377#issuecomment-490752300
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12455) Move the packaging of pyflink to flink-dist

2019-05-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12455:
---
Labels: pull-request-available  (was: )

> Move the packaging of pyflink to flink-dist
> ---
>
> Key: FLINK-12455
> URL: https://issues.apache.org/jira/browse/FLINK-12455
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
>
> Currently, there is a pom.xml under module flink-python which is responsible 
> for the package of pyflink. The package logic should be moved to flink-dist 
> and then we can remove the pom.xml under flink-python and make flink-python a 
> pure python module.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dianfu opened a new pull request #8377: [FLINK-12455][python] Move the packaging of pyflink to flink-dist

2019-05-08 Thread GitBox
dianfu opened a new pull request #8377: [FLINK-12455][python] Move the 
packaging of pyflink to flink-dist
URL: https://github.com/apache/flink/pull/8377
 
 
   ## What is the purpose of the change
   
   *This pull request moves the packaging of pyflink to flink-dist for the 
following reasons: *
- *flink-dist is responsible for the packaging of flink, moving the 
packaging logic there is more straightforward*
- *The pom.xml under flink-python can be removed and this makes 
flink-python a pure python module*
   
   ## Brief change log
   
 - *Moves the packaging logic to flink-dist*
 - *Removes the pom.xml under flink-python*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12455) Move the packaging of pyflink to flink-dist

2019-05-08 Thread Dian Fu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-12455:

Summary: Move the packaging of pyflink to flink-dist  (was: Move the 
package of pyflink to flink-dist)

> Move the packaging of pyflink to flink-dist
> ---
>
> Key: FLINK-12455
> URL: https://issues.apache.org/jira/browse/FLINK-12455
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>
> Currently, there is a pom.xml under module flink-python which is responsible 
> for the package of pyflink. The package logic should be moved to flink-dist 
> and then we can remove the pom.xml under flink-python and make flink-python a 
> pure python module.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282339745
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogBaseTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateCatalogBaseTable(newCatalogTable);
+
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   // TODO: [FLINK-12452] alterTable() in all 
catalogs should ensure existing base table and the new one are of the same type
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282339745
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogBaseTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateCatalogBaseTable(newCatalogTable);
+
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   // TODO: [FLINK-12452] alterTable() in all 
catalogs should ensure existing base table and the new one are of the same type
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282337775
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   protected abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
 
 Review comment:
   This would go against the previous suggestion you made:
   
   > Similar to createTable(), I don't see we need to provide implementations 
for alterTable() in each subclass because I don't really see much difference. 
If validation is different, then we can define a new interface in the base and 
have each subclass implement it.
   
   How about leave this as is and refactor if necessary in the future?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282339745
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogBaseTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateCatalogBaseTable(newCatalogTable);
+
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   // TODO: [FLINK-12452] alterTable() in all 
catalogs should ensure existing base table and the new one are of the same type
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   

[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282337926
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -363,4 +332,23 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(ObjectPath tablePath
throw new UnsupportedOperationException();
}
 
+   // -- utils --
+
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private Map retrieveFlinkProperties(Map 
hiveTableParams) {
 
 Review comment:
   I'd just leave them as-is, since I don't see much value of making it static 
given no other class uses it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282337926
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -363,4 +332,23 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(ObjectPath tablePath
throw new UnsupportedOperationException();
}
 
+   // -- utils --
+
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private Map retrieveFlinkProperties(Map 
hiveTableParams) {
 
 Review comment:
   What's the key point of making it static given no other class uses it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282337775
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   protected abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
 
 Review comment:
   This would go against the previous suggestion you made:
   
   > Similar to createTable(), I don't see we need to provide implementations 
for alterTable() in each subclass because I don't really see much difference. 
If validation is different, then we can define a new interface in the base and 
have each subclass implement it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on issue #8303: [FLINK-12343]add file replication config for yarn configuration

2019-05-08 Thread GitBox
HuangZhenQiu commented on issue #8303: [FLINK-12343]add file replication config 
for yarn configuration
URL: https://github.com/apache/flink/pull/8303#issuecomment-490741884
 
 
   @xintongsong  @tillrohrmann @rmetzger 
   Thanks for the informative comments. The verified the solution end to end. 
It works. I will continue with adding an integration test in flink-yarn-tests 
module in the PR later.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12455) Move the package of pyflink to flink-dist

2019-05-08 Thread Dian Fu (JIRA)
Dian Fu created FLINK-12455:
---

 Summary: Move the package of pyflink to flink-dist
 Key: FLINK-12455
 URL: https://issues.apache.org/jira/browse/FLINK-12455
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


Currently, there is a pom.xml under module flink-python which is responsible 
for the package of pyflink. The package logic should be moved to flink-dist and 
then we can remove the pom.xml under flink-python and make flink-python a pure 
python module.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wuchong commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system

2019-05-08 Thread GitBox
wuchong commented on a change in pull request #8360: 
[FLINK-12393][table-common] Add the user-facing classes of the new type system
URL: https://github.com/apache/flink/pull/8360#discussion_r282335432
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
 ##
 @@ -0,0 +1,703 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.AnyType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import 
org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationAnyType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import 
org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A {@link DataType} can be used to declare input and/or output types of 
operations. This class
+ * enumerates all supported data types of the Table & SQL API.
+ */
+@PublicEvolving
+public final class DataTypes {
+
+   // we use SQL-like naming for data types and avoid Java keyword clashes
+   // CHECKSTYLE.OFF: MethodName
+
+   /**
+* Data type of a fixed-length character string {@code CHAR(n)} where 
{@code n} is the number
+* of code points. {@code n} must have a value between 1 and 255 (both 
inclusive).
+*
+* @see CharType
+*/
+   public static DataType.AtomicDataType CHAR(int n) {
+   return new DataType.AtomicDataType(new CharType(n));
+   }
+
+   /**
+* Data type of a variable-length character string {@code VARCHAR(n)} 
where {@code n} is the
+* maximum number of code points. {@code n} must have a value between 1 
and {@link Integer#MAX_VALUE}
+* (both inclusive).
+*
+* @see VarCharType
+*/
+   public static DataType.AtomicDataType VARCHAR(int n) {
+   return new DataType.AtomicDataType(new VarCharType(n));
+   }
+
+   /**
+* Data type of a variable-length character string with defined maximum 
length. This is a shortcut
+* for {@code VARCHAR(2147483647)} for representing JVM strings.
+*
+* @see VarCharType
+  

[GitHub] [flink] wuchong commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system

2019-05-08 Thread GitBox
wuchong commented on a change in pull request #8360: 
[FLINK-12393][table-common] Add the user-facing classes of the new type system
URL: https://github.com/apache/flink/pull/8360#discussion_r282334560
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
 ##
 @@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Describes the data type of a value in the table ecosystem. Instances of 
this class can be used to
+ * declare input and/or output types of operations.
+ *
+ * The {@link DataType} class has two responsibilities: declaring a logical 
type and giving hints
+ * about the physical representation of data to the optimizer. While the 
logical type is mandatory,
+ * hints are optional but useful at the edges to other APIs.
+ *
+ * The logical type is independent of any physical representation and is 
close to the "data type"
+ * terminology of the SQL standard. See {@link 
org.apache.flink.table.types.logical.LogicalType} and
+ * its subclasses for more information about available logical types and their 
properties.
+ *
+ * Physical hints are required at the edges of the table ecosystem. Hints 
indicate the data format
+ * that an implementation expects. For example, a data source could express 
that it produces values for
+ * logical timestamps using a {@link java.sql.Timestamp} class instead of 
using {@link java.time.LocalDateTime}.
+ * With this information, the runtime is able to convert the produced class 
into its internal data
+ * format. In return, a data sink can declare the data format it consumes from 
the runtime.
+ *
+ * @see DataTypes for a list of supported data types and instances of this 
class.
+ */
+@PublicEvolving
+public abstract class DataType implements Serializable {
+
+   protected LogicalType logicalType;
+
+   protected @Nullable Class conversionClass;
+
+   private DataType(LogicalType logicalType, @Nullable Class 
conversionClass) {
+   this.logicalType = Preconditions.checkNotNull(logicalType, 
"Logical type must not be null.");
+   this.conversionClass = performEarlyClassValidation(logicalType, 
conversionClass);
+   }
+
+   /**
+* Returns the corresponding logical type.
+*
+* @return a parameterized instance of {@link LogicalType}
+*/
+   public LogicalType getLogicalType() {
+   return logicalType;
+   }
+
+   /**
+* Returns the corresponding conversion class for representing values. 
If no conversion class was
+* defined manually, the default conversion defined by the logical type 
is used.
+*
+* @see LogicalType#getDefaultConversion()
+*
+* @return the expected conversion class
+*/
+   public Class getConversionClass() {
+   if (conversionClass == null) {
+   return logicalType.getDefaultConversion();
+   }
+   return conversionClass;
+   }
+
+   /**
+* Adds a hint that null values are not expected in the data for this 
type.
+*
+* @return a new, reconfigured data type instance
+*/
+   public abstract DataType notNull();
+
+   /**
+* Adds a hint that null values are expected in the data for this type 
(default behavior).
+*
+* This method exists for explicit declaration of the default 
behavior or for invalidation of
+* a previous call to {@link #notNull()}.
+*
+* @return a new, reconfigured data type instance
+*/
+   public abstract DataType andNull();
 
 Review comment:
   How about rename `andNull()` --> `asNullable()` and 

[GitHub] [flink] wuchong commented on a change in pull request #8360: [FLINK-12393][table-common] Add the user-facing classes of the new type system

2019-05-08 Thread GitBox
wuchong commented on a change in pull request #8360: 
[FLINK-12393][table-common] Add the user-facing classes of the new type system
URL: https://github.com/apache/flink/pull/8360#discussion_r282334300
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
 ##
 @@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Describes the data type of a value in the table ecosystem. Instances of 
this class can be used to
+ * declare input and/or output types of operations.
+ *
+ * The {@link DataType} class has two responsibilities: declaring a logical 
type and giving hints
+ * about the physical representation of data to the optimizer. While the 
logical type is mandatory,
+ * hints are optional but useful at the edges to other APIs.
+ *
+ * The logical type is independent of any physical representation and is 
close to the "data type"
+ * terminology of the SQL standard. See {@link 
org.apache.flink.table.types.logical.LogicalType} and
+ * its subclasses for more information about available logical types and their 
properties.
+ *
+ * Physical hints are required at the edges of the table ecosystem. Hints 
indicate the data format
+ * that an implementation expects. For example, a data source could express 
that it produces values for
+ * logical timestamps using a {@link java.sql.Timestamp} class instead of 
using {@link java.time.LocalDateTime}.
+ * With this information, the runtime is able to convert the produced class 
into its internal data
+ * format. In return, a data sink can declare the data format it consumes from 
the runtime.
+ *
+ * @see DataTypes for a list of supported data types and instances of this 
class.
+ */
+@PublicEvolving
+public abstract class DataType implements Serializable {
+
+   protected LogicalType logicalType;
+
+   protected @Nullable Class conversionClass;
+
+   private DataType(LogicalType logicalType, @Nullable Class 
conversionClass) {
+   this.logicalType = Preconditions.checkNotNull(logicalType, 
"Logical type must not be null.");
+   this.conversionClass = performEarlyClassValidation(logicalType, 
conversionClass);
+   }
+
+   /**
+* Returns the corresponding logical type.
+*
+* @return a parameterized instance of {@link LogicalType}
+*/
+   public LogicalType getLogicalType() {
+   return logicalType;
+   }
+
+   /**
+* Returns the corresponding conversion class for representing values. 
If no conversion class was
+* defined manually, the default conversion defined by the logical type 
is used.
+*
+* @see LogicalType#getDefaultConversion()
+*
+* @return the expected conversion class
+*/
+   public Class getConversionClass() {
+   if (conversionClass == null) {
+   return logicalType.getDefaultConversion();
+   }
+   return conversionClass;
+   }
+
+   /**
+* Adds a hint that null values are not expected in the data for this 
type.
+*
+* @return a new, reconfigured data type instance
+*/
+   public abstract DataType notNull();
+
+   /**
+* Adds a hint that null values are expected in the data for this type 
(default behavior).
+*
+* This method exists for explicit declaration of the default 
behavior or for invalidation of
+* a previous call to {@link #notNull()}.
+*
+* @return a new, reconfigured data type instance
+*/
+   public abstract DataType andNull();
+
+   /**
+* Adds a hint that data should be represented using 

[GitHub] [flink] chunweilei commented on a change in pull request #8266: [FLINK-9904]Allow users to control MaxDirectMemorySize

2019-05-08 Thread GitBox
chunweilei commented on a change in pull request #8266: [FLINK-9904]Allow users 
to control MaxDirectMemorySize
URL: https://github.com/apache/flink/pull/8266#discussion_r282333076
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/config.sh
 ##
 @@ -429,6 +430,11 @@ if [ -z "${FLINK_TM_NET_BUF_MAX}" -o 
"${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
 FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX})
 fi
 
+# Define FLINK_TM_MAX_OFFHEAP_SIZE if it is not already set
+if [ -z "${FLINK_TM_MAX_OFFHEAP_SIZE}" ]; then
+# default: Long.MAX_VALUE in TB
+FLINK_TM_MAX_OFFHEAP_SIZE=$(readFromConfig ${KEY_TASKM_MAX_OFFHEAP_SIZE} 
"8388607T" "${YAML_CONF}")
 
 Review comment:
   Maybe it is set to -1 by user incorrectly. You can find other configurations 
like `FLINK_TM_NET_BUF_MAX` use this condition too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-08 Thread GitBox
eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#issuecomment-490734729
 
 
   Hi, @GJL @tillrohrmann , I have updated the PR based on the latest master. 
Would you mind to begin another round of code review ?
   
   As discussed in the first round of code review, there are still some open 
questions, I make a summary to be  convenient for discussion. Any comment is 
highly appreciated.
   
   1.  
[[SchedulePolicy](https://github.com/apache/flink/pull/8309#discussion_r281049742)]
 Using vertex state transitions to schedule vertices has the benefit of 
avoiding flood `onPartitionConsumable` notifications, while there may be 
idle-waiting-result of PIPELINED shuffle mode. So, I think we could keep the 
benefit by relying on both vertex state transitions and `onPartitionConsumable` 
notifications.
   1) `DeploymentOption#sendScheduleOrUpdateConsumerMessage` set to 
true if the vertex has PIPELINED produced result partition and set to false if 
all the produced result partitions are BLOCKING
2) Schedule vertices with BLOCKING input result partition using vertex 
state transition.
3) Schedule vertices with PIPELINED input result partitions using 
`onPartitionConsumable` notification.
   
   2. [[JobGraph 
Usage](https://github.com/apache/flink/pull/8309#discussion_r281037134)] The 
only usage of `JobGraph` is to provide `InputDependencyConstraint` in 
`LazyFromSourcesSchedulingStrategy`, while, it is not used in 
`EagerSchedulingStrategy`. Maybe we could remove `JobGraph` from 
`SchedulingStrategyFactory#createInstance` and add `InputDependencyConstraint` 
information into `SchedulingTopology`, which need an new interface in 
`SchedulingTopology`:
   `InputDependencyConstraint getInputDependencyConstraint(JobVertexID 
jobVertexId)`?
   
   3. [[ANY/ALL Schedule 
Granularity](https://issues.apache.org/jira/browse/FLINK-12229)]  In the 
original scheduler, the schedule granularity is ANY/ALL the IntermediateDataSet 
finishes, and using granularity of result partition could speedup deployments 
but may involve flood of partition update network communication and resource 
deadlock. Thus, in this PR my implementation is consistent with the original 
logic. 
   However, we are wondering we could use some methods to keep both speedup 
deployments advantage and avoiding flood partition update and resource 
deadlock. Based on our production experience, we propose to introduce a new 
trigger `InputDependencyConstraint#Progress`, which is a float between 0.0~1.0 
identifying the percentage of the finish result partitions. 1.0 means ALL the 
input result partitions finish and we configured it to 0.8 as default in our 
production system to balance the speedup advantage and flood partition update, 
possible resource deadlock.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-08 Thread GitBox
eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#issuecomment-490734729
 
 
   Hi, @GJL @tillrohrmann , I have updated the PR based on the latest master. 
Would you mind to begin another round of code review ?
   
   As discussed in the first round of code review, there are still some open 
questions, I make a summary to be  convenient for discussion. Any comment is 
highly appreciated.
   
   1.  
[[SchedulePolicy](https://github.com/apache/flink/pull/8309#discussion_r281049742)]
 Using vertex state transitions to schedule vertices has the benefit of 
avoiding flood `onPartitionConsumable` notifications, while there may be 
idle-waiting-result of PIPELINED shuffle mode. So, I think we could keep the 
benefit by relying on both vertex state transitions and `onPartitionConsumable` 
notifications.
   1) `DeploymentOption#sendScheduleOrUpdateConsumerMessage` set to 
true if the vertex has PIPELINED produced result partition and set to false if 
all the produced result partitions are BLOCKING
2) Schedule vertices with BLOCKING input result partition using vertex 
state transition.
3) Schedule vertices with PIPELINED input result partitions using 
`onPartitionConsumable` notification.
   
   2. [[JobGraph 
Usage](https://github.com/apache/flink/pull/8309#discussion_r281037134)] The 
only usage of `JobGraph` is to provide `InputDependencyConstraint` in 
`LazyFromSourcesSchedulingStrategy`, while, it is not used in 
`EagerSchedulingStrategy`. Maybe we could remove `JobGraph` from 
`SchedulingStrategyFactory#createInstance` and add `InputDependencyConstraint` 
information into `SchedulingTopology`, which need an new interface in 
`SchedulingTopology`:
   `InputDependencyConstraint getInputDependencyConstraint(JobVertexID 
jobVertexId)`?
   
   3. [[ANY/ALL Schedule 
Granularity](https://issues.apache.org/jira/browse/FLINK-12229)]  In the 
original scheduler, the schedule granularity is ANY/ALL the IntermediateDataSet 
finishes, and using granularity of result partition could speedup deployments 
but may involve flood of partition update network communication and resource 
deadlock. Thus, in this PR my implementation is consistent with the original 
logic. 
   However, we are wondering we could use some methods to keep both speedup 
deployments advantage and avoiding flood partition update and resource 
deadlock. Based on our production experience, we propose to introduce a new 
trigger `InputDependencyConstraint#Progress`, which is a float between 0.0~1.0 
identifying the percentage of the finish result partitions. 1.0 means ALL the 
input result partitions finish and we configured it to 0.8 in our production 
system as default to balance the speedup advantage and flood partition update, 
possible resource deadlock.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12454) Add custom check options for lint-python.sh

2019-05-08 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12454:

Labels:   (was: Python)

> Add custom check options for lint-python.sh
> ---
>
> Key: FLINK-12454
> URL: https://issues.apache.org/jira/browse/FLINK-12454
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> At present, int-python.sh has Python compatibility checks and code style 
> checks. By default, all checks will be executed by default. In the 
> development stage, only code styles check required in some case. Therefore, 
> in this JIRA, the specified check item parameters are added for 
> lint-python.sh.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12454) Add custom check options for lint-python.sh

2019-05-08 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12454:

Labels: Python  (was: )

> Add custom check options for lint-python.sh
> ---
>
> Key: FLINK-12454
> URL: https://issues.apache.org/jira/browse/FLINK-12454
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: Python
>
> At present, int-python.sh has Python compatibility checks and code style 
> checks. By default, all checks will be executed by default. In the 
> development stage, only code styles check required in some case. Therefore, 
> in this JIRA, the specified check item parameters are added for 
> lint-python.sh.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12454) Add custom check options for lint-python.sh

2019-05-08 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-12454:

Description: At present, int-python.sh has Python compatibility checks and 
code style checks. By default, all checks will be executed by default. In the 
development stage, only code styles check required in some case. Therefore, in 
this JIRA, the specified check item parameters are added for lint-python.sh.  
(was: At present, int-python.sh has Python compatibility checks and code style 
checks. By default, all checks will be executed by default. In the development 
stage, only one check may be required in some case. Therefore, in this JIRA, 
the specified check item parameters are added for lint-python.sh.)

> Add custom check options for lint-python.sh
> ---
>
> Key: FLINK-12454
> URL: https://issues.apache.org/jira/browse/FLINK-12454
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Python
>Affects Versions: 1.9.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>
> At present, int-python.sh has Python compatibility checks and code style 
> checks. By default, all checks will be executed by default. In the 
> development stage, only code styles check required in some case. Therefore, 
> in this JIRA, the specified check item parameters are added for 
> lint-python.sh.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12454) Add custom check options for lint-python.sh

2019-05-08 Thread sunjincheng (JIRA)
sunjincheng created FLINK-12454:
---

 Summary: Add custom check options for lint-python.sh
 Key: FLINK-12454
 URL: https://issues.apache.org/jira/browse/FLINK-12454
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Affects Versions: 1.9.0
Reporter: sunjincheng
Assignee: sunjincheng


At present, int-python.sh has Python compatibility checks and code style 
checks. By default, all checks will be executed by default. In the development 
stage, only one check may be required in some case. Therefore, in this JIRA, 
the specified check item parameters are added for lint-python.sh.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] eaglewatcherwb commented on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-08 Thread GitBox
eaglewatcherwb commented on issue #8309: [FLINK-12229] [runtime] Implement 
LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#issuecomment-490734729
 
 
   Hi, @GJL @tillrohrmann , I have updated the PR based on the latest master. 
Would you mind to begin another round of code review ?
   
   As discussed in the first round of code review, there are still some open 
questions, I make a summary to be  convenient for discussion.
   
   1.  
[[SchedulePolicy](https://github.com/apache/flink/pull/8309#discussion_r281049742)]
 Using vertex state transitions to schedule vertices has the benefit of 
avoiding flood `onPartitionConsumable` notifications, while there may be 
idle-waiting-result of PIPELINED shuffle mode. So, I think we could keep the 
benefit by relying on both vertex state transitions and `onPartitionConsumable` 
notifications.
1) `DeploymentOption#sendScheduleOrUpdateConsumerMessage` set to true 
if the vertex has PIPELINED produced result partition and set to false if all 
the produced result partitions are BLOCKING
2) Schedule vertices with BLOCKING input result partition using vertex 
state transition.
3) Schedule vertices with PIPELINED input result partitions using 
`onPartitionConsumable` notification.
   
   2. [[JobGraph 
Usage](https://github.com/apache/flink/pull/8309#discussion_r281037134)] The 
only usage of `JobGraph` is to provide `InputDependencyConstraint` in 
`LazyFromSourcesSchedulingStrategy`, while, it is not used in 
`EagerSchedulingStrategy`. Maybe we could remove `JobGraph` from 
`SchedulingStrategyFactory#createInstance` and add `InputDependencyConstraint` 
information into `SchedulingTopology`, which need an new interface in 
`SchedulingTopology`:
   `InputDependencyConstraint getInputDependencyConstraint(JobVertexID 
jobVertexId)`?
   
   3. [[ANY/ALL Schedule 
Granularity](https://issues.apache.org/jira/browse/FLINK-12229)]  In the 
original scheduler, the schedule granularity is ANY/ALL the IntermediateDataSet 
finishes, and using granularity of result partition could speedup deployments 
but may involve flood of partition update network communication and resource 
deadlock. Thus, in this PR my implementation is consistent with the original 
logic. 
   However, we are wondering we could use some methods to keep both speedup 
deployments advantage and avoiding flood partition update and resource 
deadlock. Based on our production experience, we propose to introduce a new 
trigger `InputDependencyConstraint#Progress`, which is a float between 0.0~1.0 
and identifies the percentage of the finish result partitions. 1.0 means ALL 
the input result partitions finish and we configured it to 0.8 as default to 
balance the speedup advantage and flood partition update, possible resource 
deadlock.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] Implement LazyFromSourcesScheduling Strategy

2019-05-08 Thread GitBox
eaglewatcherwb edited a comment on issue #8309: [FLINK-12229] [runtime] 
Implement LazyFromSourcesScheduling Strategy
URL: https://github.com/apache/flink/pull/8309#issuecomment-490734729
 
 
   Hi, @GJL @tillrohrmann , I have updated the PR based on the latest master. 
Would you mind to begin another round of code review ?
   
   As discussed in the first round of code review, there are still some open 
questions, I make a summary to be  convenient for discussion. Any comment is 
highly appreciated.
   
   1.  
[[SchedulePolicy](https://github.com/apache/flink/pull/8309#discussion_r281049742)]
 Using vertex state transitions to schedule vertices has the benefit of 
avoiding flood `onPartitionConsumable` notifications, while there may be 
idle-waiting-result of PIPELINED shuffle mode. So, I think we could keep the 
benefit by relying on both vertex state transitions and `onPartitionConsumable` 
notifications.
   1) `DeploymentOption#sendScheduleOrUpdateConsumerMessage` set to 
true if the vertex has PIPELINED produced result partition and set to false if 
all the produced result partitions are BLOCKING
2) Schedule vertices with BLOCKING input result partition using vertex 
state transition.
3) Schedule vertices with PIPELINED input result partitions using 
`onPartitionConsumable` notification.
   
   2. [[JobGraph 
Usage](https://github.com/apache/flink/pull/8309#discussion_r281037134)] The 
only usage of `JobGraph` is to provide `InputDependencyConstraint` in 
`LazyFromSourcesSchedulingStrategy`, while, it is not used in 
`EagerSchedulingStrategy`. Maybe we could remove `JobGraph` from 
`SchedulingStrategyFactory#createInstance` and add `InputDependencyConstraint` 
information into `SchedulingTopology`, which need an new interface in 
`SchedulingTopology`:
   `InputDependencyConstraint getInputDependencyConstraint(JobVertexID 
jobVertexId)`?
   
   3. [[ANY/ALL Schedule 
Granularity](https://issues.apache.org/jira/browse/FLINK-12229)]  In the 
original scheduler, the schedule granularity is ANY/ALL the IntermediateDataSet 
finishes, and using granularity of result partition could speedup deployments 
but may involve flood of partition update network communication and resource 
deadlock. Thus, in this PR my implementation is consistent with the original 
logic. 
   However, we are wondering we could use some methods to keep both speedup 
deployments advantage and avoiding flood partition update and resource 
deadlock. Based on our production experience, we propose to introduce a new 
trigger `InputDependencyConstraint#Progress`, which is a float between 0.0~1.0 
and identifies the percentage of the finish result partitions. 1.0 means ALL 
the input result partitions finish and we configured it to 0.8 as default to 
balance the speedup advantage and flood partition update, possible resource 
deadlock.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] godfreyhe opened a new pull request #8376: [FLINK-12392] [table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12

2019-05-08 Thread GitBox
godfreyhe opened a new pull request #8376: [FLINK-12392] [table-planner-blink] 
Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12
URL: https://github.com/apache/flink/pull/8376
 
 
   
   ## What is the purpose of the change
   
   *Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 
2.12*
   
   
   ## Brief change log
   
 - *Port FlinkRelMetadataQuery from Scala intto Java*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
*FlinkRelMdXXXTest*.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12392) FlinkRelMetadataQuery does not compile with Scala 2.12

2019-05-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12392:
---
Labels: pull-request-available  (was: )

> FlinkRelMetadataQuery does not compile with Scala 2.12
> --
>
> Key: FLINK-12392
> URL: https://issues.apache.org/jira/browse/FLINK-12392
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: godfrey he
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> {code}
> 10:57:51.770 [ERROR] 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala:52:
>  error: value EMPTY in class RelMetadataQuery cannot be accessed in object 
> org.apache.calcite.rel.metadata.RelMetadataQuery
> 10:57:51.770 [ERROR]  Access to protected value EMPTY not permitted because
> 10:57:51.770 [ERROR]  enclosing package metadata in package plan is not a 
> subclass of
> 10:57:51.770 [ERROR]  class RelMetadataQuery in package metadata where target 
> is defined
> 10:57:51.770 [ERROR] this(RelMetadataQuery.THREAD_PROVIDERS.get, 
> RelMetadataQuery.EMPTY)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8376: [FLINK-12392] [table-planner-blink] Port FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12

2019-05-08 Thread GitBox
flinkbot commented on issue #8376: [FLINK-12392] [table-planner-blink] Port 
FlinkRelMetadataQuery into Java to avoid compiling error with Scala 2.12
URL: https://github.com/apache/flink/pull/8376#issuecomment-490732935
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-12392) FlinkRelMetadataQuery does not compile with Scala 2.12

2019-05-08 Thread godfrey he (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he reassigned FLINK-12392:
--

Assignee: godfrey he

> FlinkRelMetadataQuery does not compile with Scala 2.12
> --
>
> Key: FLINK-12392
> URL: https://issues.apache.org/jira/browse/FLINK-12392
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Assignee: godfrey he
>Priority: Blocker
> Fix For: 1.9.0
>
>
> {code}
> 10:57:51.770 [ERROR] 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala:52:
>  error: value EMPTY in class RelMetadataQuery cannot be accessed in object 
> org.apache.calcite.rel.metadata.RelMetadataQuery
> 10:57:51.770 [ERROR]  Access to protected value EMPTY not permitted because
> 10:57:51.770 [ERROR]  enclosing package metadata in package plan is not a 
> subclass of
> 10:57:51.770 [ERROR]  class RelMetadataQuery in package metadata where target 
> is defined
> 10:57:51.770 [ERROR] this(RelMetadataQuery.THREAD_PROVIDERS.get, 
> RelMetadataQuery.EMPTY)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wenhuitang commented on issue #7805: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs.

2019-05-08 Thread GitBox
wenhuitang commented on issue #7805: [FLINK-11697] Add readMapReduceTextFile 
API for HadoopInputs.
URL: https://github.com/apache/flink/pull/7805#issuecomment-490729884
 
 
   Open another PR(https://github.com/apache/flink/pull/8375) due to this pr 
from unknown repository. So close this one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8375: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs.

2019-05-08 Thread GitBox
flinkbot commented on issue #8375: [FLINK-11697] Add readMapReduceTextFile API 
for HadoopInputs.
URL: https://github.com/apache/flink/pull/8375#issuecomment-490729901
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wenhuitang closed pull request #7805: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs.

2019-05-08 Thread GitBox
wenhuitang closed pull request #7805: [FLINK-11697] Add readMapReduceTextFile 
API for HadoopInputs.
URL: https://github.com/apache/flink/pull/7805
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wenhuitang opened a new pull request #8375: [FLINK-11697] Add readMapReduceTextFile API for HadoopInputs.

2019-05-08 Thread GitBox
wenhuitang opened a new pull request #8375: [FLINK-11697] Add 
readMapReduceTextFile API for HadoopInputs.
URL: https://github.com/apache/flink/pull/8375
 
 
   ## What is the purpose of the change
   
This pull request Add readMapReduceTextFile API for HadoopInputs.
   https://issues.apache.org/jira/browse/FLINK-11697
   ## Brief change log
   Add readMapReduceTextFile both for java and scala API.
   Add HadoopTextFormatITCase.java for mapreduce.
   Add HadoopTextFormatMapreduceITCase.scala.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
HadoopTextFormatITCase and HadoopTextFormatMapreduceITCase.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no 
 - The runtime per-record code paths (performance sensitive):  no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector:  no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  yes
 - If yes, how is the feature documented?  docs
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12453) StreamExecGlobalGroupAggregate construct wrong args for AggsHandlerCodeGenerator

2019-05-08 Thread Jingsong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-12453:
-
Description: 
In generateAggsHandler: 

val generator = new AggsHandlerCodeGenerator(
   CodeGeneratorContext(config),
   relBuilder,
   FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes,
   needRetract = false,
   config.getNullCheck,
   inputFieldCopy)

but AggsHandlerCodeGenerator args is:

class AggsHandlerCodeGenerator(
   ctx: CodeGeneratorContext,
   relBuilder: RelBuilder,
   inputFieldTypes: Seq[InternalType],
   needRetract: Boolean,
   copyInputField: Boolean,
   needAccumulate: Boolean = true)

Same issue to StreamExecIncrementalGroupAggregate

  was:
In generateAggsHandler: 

val generator = new AggsHandlerCodeGenerator(
  CodeGeneratorContext(config),
  relBuilder,
  FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes,
  needRetract = false,
  config.getNullCheck,
  inputFieldCopy)

but AggsHandlerCodeGenerator args is:

class AggsHandlerCodeGenerator(
  ctx: CodeGeneratorContext,
  relBuilder: RelBuilder,
  inputFieldTypes: Seq[InternalType],
  needRetract: Boolean,
  copyInputField: Boolean,
  needAccumulate: Boolean = true)


> StreamExecGlobalGroupAggregate construct wrong args for 
> AggsHandlerCodeGenerator
> 
>
> Key: FLINK-12453
> URL: https://issues.apache.org/jira/browse/FLINK-12453
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Jingsong Lee
>Assignee: Jark Wu
>Priority: Major
>
> In generateAggsHandler: 
> val generator = new AggsHandlerCodeGenerator(
>    CodeGeneratorContext(config),
>    relBuilder,
>    FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes,
>    needRetract = false,
>    config.getNullCheck,
>    inputFieldCopy)
> but AggsHandlerCodeGenerator args is:
> class AggsHandlerCodeGenerator(
>    ctx: CodeGeneratorContext,
>    relBuilder: RelBuilder,
>    inputFieldTypes: Seq[InternalType],
>    needRetract: Boolean,
>    copyInputField: Boolean,
>    needAccumulate: Boolean = true)
> Same issue to StreamExecIncrementalGroupAggregate



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] wenhuitang commented on issue #7810: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs.

2019-05-08 Thread GitBox
wenhuitang commented on issue #7810: [FLINK-11698] Add readMapRedTextFile API 
for HadoopInputs.
URL: https://github.com/apache/flink/pull/7810#issuecomment-490729368
 
 
   Open another PR(https://github.com/apache/flink/pull/8374) due to this  pr 
from unknown repository. So close this one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wenhuitang closed pull request #7810: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs.

2019-05-08 Thread GitBox
wenhuitang closed pull request #7810: [FLINK-11698] Add readMapRedTextFile API 
for HadoopInputs.
URL: https://github.com/apache/flink/pull/7810
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12453) StreamExecGlobalGroupAggregate construct wrong args for AggsHandlerCodeGenerator

2019-05-08 Thread Jingsong Lee (JIRA)
Jingsong Lee created FLINK-12453:


 Summary: StreamExecGlobalGroupAggregate construct wrong args for 
AggsHandlerCodeGenerator
 Key: FLINK-12453
 URL: https://issues.apache.org/jira/browse/FLINK-12453
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Jingsong Lee
Assignee: Jark Wu


In generateAggsHandler: 

val generator = new AggsHandlerCodeGenerator(
  CodeGeneratorContext(config),
  relBuilder,
  FlinkTypeFactory.toInternalRowType(inputRowType).getFieldTypes,
  needRetract = false,
  config.getNullCheck,
  inputFieldCopy)

but AggsHandlerCodeGenerator args is:

class AggsHandlerCodeGenerator(
  ctx: CodeGeneratorContext,
  relBuilder: RelBuilder,
  inputFieldTypes: Seq[InternalType],
  needRetract: Boolean,
  copyInputField: Boolean,
  needAccumulate: Boolean = true)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8374: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs.

2019-05-08 Thread GitBox
flinkbot commented on issue #8374: [FLINK-11698] Add readMapRedTextFile API for 
HadoopInputs.
URL: https://github.com/apache/flink/pull/8374#issuecomment-490729202
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wenhuitang opened a new pull request #8374: [FLINK-11698] Add readMapRedTextFile API for HadoopInputs.

2019-05-08 Thread GitBox
wenhuitang opened a new pull request #8374: [FLINK-11698] Add 
readMapRedTextFile API for HadoopInputs.
URL: https://github.com/apache/flink/pull/8374
 
 
   ## What is the purpose of the change
   
This pull request Add readMapRedTextFile API for HadoopInputs.
   https://issues.apache.org/jira/browse/FLINK-11698
   
   ## Brief change log
   Add readMapRedTextFile both for java and scala API.
   Add HadoopTextFormatITCase.java for mapred.
   Add HadoopTextFormatMapredITCase.scala.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
HadoopTextFormatITCase and HadoopTextFormatMapredITCase.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no 
 - The runtime per-record code paths (performance sensitive):  no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector:  no
   
   ## Documentation
   
 - Does this pull request introduce a new feature?  yes
 - If yes, how is the feature documented?  docs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282322682
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogBaseTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateCatalogBaseTable(newCatalogTable);
+
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   // TODO: [FLINK-12452] alterTable() in all 
catalogs should ensure existing base table and the new one are of the same type
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   

[jira] [Assigned] (FLINK-12375) flink-container job jar does not have read permissions

2019-05-08 Thread Yun Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-12375:


Assignee: Yun Tang

> flink-container job jar does not have read permissions
> --
>
> Key: FLINK-12375
> URL: https://issues.apache.org/jira/browse/FLINK-12375
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Docker
>Reporter: Adam Lamar
>Assignee: Yun Tang
>Priority: Major
>
> When building a custom job container using flink-container, the job can't be 
> launched if the provided job jar does not have world-readable permission.
> This is because the job jar in the container is owned by root:root, but the 
> docker container executes as the flink user.
> In environments with restrictive umasks (e.g. company laptops) that create 
> files without group and other read permissions by default, this causes the 
> instructions to fail.
> To reproduce on master:
> {code:java}
> cd flink-container/docker
> cp ../../flink-examples/flink-examples-streaming/target/WordCount.jar .
> chmod go-r WordCount.jar  # still maintain user read permission
> ./build.sh --job-jar WordCount.jar --from-archive 
> flink-1.8.0-bin-scala_2.11.tgz --image-name flink-job:latest
> FLINK_DOCKER_IMAGE_NAME=flink-job 
> FLINK_JOB=org.apache.flink.streaming.examples.wordcount.WordCount 
> docker-compose up{code}
> which results in the following error:
> {code:java}
> job-cluster_1 | 2019-04-30 18:40:57,787 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Could not start 
> cluster entrypoint StandaloneJobClusterEntryPoint.
> job-cluster_1 | 
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
> initialize the cluster entrypoint StandaloneJobClusterEntryPoint.
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:190)
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:535)
> job-cluster_1 | at 
> org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint.main(StandaloneJobClusterEntryPoint.java:105)
> job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not 
> create the DispatcherResourceManagerComponent.
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:257)
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:224)
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
> job-cluster_1 | at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
> job-cluster_1 | ... 2 more
> job-cluster_1 | Caused by: org.apache.flink.util.FlinkException: Could not 
> load the provided entrypoint class.
> job-cluster_1 | at 
> org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:119)
> job-cluster_1 | at 
> org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.retrieveJobGraph(ClassPathJobGraphRetriever.java:96)
> job-cluster_1 | at 
> org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:62)
> job-cluster_1 | at 
> org.apache.flink.runtime.dispatcher.JobDispatcherFactory.createDispatcher(JobDispatcherFactory.java:41)
> job-cluster_1 | at 
> org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:184)
> job-cluster_1 | ... 6 more
> job-cluster_1 | Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.streaming.examples.wordcount.WordCount
> job-cluster_1 | at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> job-cluster_1 | at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> job-cluster_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> job-cluster_1 | at 
> org.apache.flink.container.entrypoint.ClassPathJobGraphRetriever.createPackagedProgram(ClassPathJobGraphRetriever.java:116)
> job-cluster_1 | ... 10 more{code}
> This issue can be fixed by chown'ing the job.jar file to flink:flink in the 
> Dockerfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282321982
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   protected abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
 
 Review comment:
   Even though this is not a public API, I feel it should be more formal. 
Specifically, we should probably can let this return true or false for 
validation result, rather than relying on throwing IllegalArgumentException. 
Secondly, validation can be more more complicated than just check the instance 
type. At this level, it shouldn't care what kind of validation the sub classes 
might do.
   
   I think it's acceptable if the base class doesn't care if the sub class does 
validation or how it does it. So it's okay if we remove this and let each 
subclass do its validation when create/alter table. This way might be even 
cleaner.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282322716
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), newTable);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282322682
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,196 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogBaseTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   validateCatalogBaseTable(newCatalogTable);
+
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   // TODO: [FLINK-12452] alterTable() in all 
catalogs should ensure existing base table and the new one are of the same type
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   

[GitHub] [flink] tianchen92 commented on a change in pull request #8266: [FLINK-9904]Allow users to control MaxDirectMemorySize

2019-05-08 Thread GitBox
tianchen92 commented on a change in pull request #8266: [FLINK-9904]Allow users 
to control MaxDirectMemorySize
URL: https://github.com/apache/flink/pull/8266#discussion_r282321092
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/config.sh
 ##
 @@ -429,6 +430,11 @@ if [ -z "${FLINK_TM_NET_BUF_MAX}" -o 
"${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
 FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX})
 fi
 
+# Define FLINK_TM_MAX_OFFHEAP_SIZE if it is not already set
+if [ -z "${FLINK_TM_MAX_OFFHEAP_SIZE}" ]; then
+# default: Long.MAX_VALUE in TB
+FLINK_TM_MAX_OFFHEAP_SIZE=$(readFromConfig ${KEY_TASKM_MAX_OFFHEAP_SIZE} 
"8388607T" "${YAML_CONF}")
 
 Review comment:
   I am not sure. In what case would ${FLINK_TM_MAX_OFFHEAP_SIZE} be equal to 
-1?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282320563
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -363,4 +332,23 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(ObjectPath tablePath
throw new UnsupportedOperationException();
}
 
+   // -- utils --
+
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private Map retrieveFlinkProperties(Map 
hiveTableParams) {
+   return hiveTableParams.entrySet().stream()
+   .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+   .collect(Collectors.toMap(e -> 
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+   }
+
+   /**
+* Add a prefix to Flink-created properties to distinguish them from 
Hive-created properties.
+*/
+   private Map maskFlinkProperties(Map 
properties) {
 
 Review comment:
   Static?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282320535
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -363,4 +332,23 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(ObjectPath tablePath
throw new UnsupportedOperationException();
}
 
+   // -- utils --
+
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private Map retrieveFlinkProperties(Map 
hiveTableParams) {
 
 Review comment:
   static?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12437) Taskmanager doesn't initiate registration after jobmanager marks it terminated

2019-05-08 Thread Abdul Qadeer (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836032#comment-16836032
 ] 

Abdul Qadeer commented on FLINK-12437:
--

[~rmetzger] I don't expect a fix to be made available for this in 1.4.0. I 
would like to know if this is a known issue fixed in newer versions. 

> Taskmanager doesn't initiate registration after jobmanager marks it terminated
> --
>
> Key: FLINK-12437
> URL: https://issues.apache.org/jira/browse/FLINK-12437
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Abdul Qadeer
>Priority: Major
>
> This issue is observed in Standalone cluster deployment mode with Zookeeper 
> HA enabled in Flink 1.4.0. A few taskmanagers restarted due to Out of 
> Metaspace.
>  The offending taskmanager `pipelineruntime-taskmgr-6789dd578b-dcp4r` first 
> successfully registers with jobmanager, and the remote watcher marks it 
> terminated soon after as seen in logs. There were other taskmanagers that 
> were terminated around same time but they had been quarantined by jobmanager 
> with message similar to:
> {noformat}
> Association to [akka.tcp://flink@10.60.5.121:8070] having UID [864976677] is 
> irrecoverably failed. UID is now quarantined and all messages to this UID 
> will be delivered to dead letters. Remote actorsystem must be restarted to 
> recover from this situation.
> {noformat}
> They came back up and successfully registered with jobmanager. This didn't 
> happen for the offending taskmanager:
>   
>  At JobManager:
> {noformat}
> {"timeMillis":1557073368155,"thread":"flink-akka.actor.default-dispatcher-49","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Registered
>  TaskManager at pipelineruntime-taskmgr-6789dd578b-dcp4r 
> (akka.tcp://flink@10.60.5.85:8070/user/taskmanager) as 
> ae61ac607f0ab35ab5066f7dc221e654. Current number of registered hosts is 8. 
> Current number of alive task slots is 
> 51.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":125,"threadPriority":5}
> ...
> ...
> {"timeMillis":1557073391386,"thread":"flink-akka.actor.default-dispatcher-82","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Unregistered
>  task manager /10.60.5.85. Number of registered task managers 7. Number of 
> available slots 
> 45.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":159,"threadPriority":5}
> ...
> ...
> {"timeMillis":1557073391483,"thread":"flink-akka.actor.default-dispatcher-82","level":"INFO","loggerName":"org.apache.flink.runtime.instance.InstanceManager","message":"Unregistered
>  task manager /10.60.5.85. Number of registered task managers 6. Number of 
> available slots 
> 39.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":159,"threadPriority":5}
> ...
> ...
> {"timeMillis":1557073370389,"thread":"flink-akka.actor.default-dispatcher-35","level":"INFO","loggerName":"akka.actor.LocalActorRef","message":"Message
>  [akka.remote.ReliableDeliverySupervisor$Ungate$] from 
> Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.60.5.85%3A8070-3#1863607260]
>  to 
> Actor[akka://flink/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2Fflink%4010.60.5.85%3A8070-3#1863607260]
>  was not delivered. [22] dead letters encountered. This logging can be turned 
> off or adjusted with configuration settings 'akka.log-dead-letters' and 
> 'akka.log-dead-letters-during-shutdown'.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":98,"threadPriority":5}
> {noformat}
> At TaskManager:
> {noformat}
> {"timeMillis":1557073366068,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Starting
>  
> TaskManager","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
> {"timeMillis":1557073366073,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Starting
>  TaskManager actor system at 
> 10.60.5.85:8070.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
> {"timeMillis":1557073366077,"thread":"pool-2-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.taskmanager.TaskManager","message":"Trying
>  to start actor system at 
> 10.60.5.85:8070","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","threadId":40,"threadPriority":5}
> 

[GitHub] [flink] sunhaibotb edited a comment on issue #8124: [FLINK-11877] Implement the runtime handling of the InputSelectable interface

2019-05-08 Thread GitBox
sunhaibotb edited a comment on issue #8124: [FLINK-11877] Implement the runtime 
handling of the InputSelectable interface
URL: https://github.com/apache/flink/pull/8124#issuecomment-490388025
 
 
   @pnowojski , the code has been updated and it relies on the latest version 
of [PR-7959](https://github.com/apache/flink/pull/7959).
   
   - The throughput difference of `StreamTwoInputSelectableProcessor` to 
`StreamTwoInputProcessor` dropped from 3.51% to 2.88%.  That is to say,  
**`StreamTwoInputSelectableProcessor` is still 2.88% slower than 
`StreamTwoInputProcessor`**.
   
 - Benchmark Results (run three times benchmark to calculate the average)
 `StreamTwoInputSelectableProcessor`: 23151 ops/ms
 `StreamTwoInputProcessor`: 23839 ops/ms
   
 - If the code which deserializes  elements from the buffers is put into 
`StreamTwoInputSelectableProcessor#processInput()`, the performance will be 
greatly improved (I tested it before and see below for code structure). But 
this deviates from the purpose of abstracting the `Input` interface.  **Is 
Digging JIT-optimization the next direction?  Or is the performance regression 
acceptable under the current structure?**
   
   ```
   === StreamTwoInputXProcessor.java ===
   public boolean processInput() throws Exception {
while (true) {
...
if (currentRecordDeserializer != null) {
DeserializationResult result = 
currentRecordDeserializer.getNextRecord(deserializationDelegate1);
if (result.isFullRecord()) {
element = 
deserializationDelegate1.getInstance();
}
...
}
   }
   ```
   
   - The related code of `Input#listen()` is temporary and has not been 
modified to base on your [PR-8361](https://github.com/apache/flink/pull/8361). 
Replacing listeners with `CompletableFuture` in `InputGates` is a very good 
design, and the old interface is really awkward to `Input`. **Next, do I put 
this PR on the top of your 
[PR-8361](https://github.com/apache/flink/pull/8361)?**
   
   P.S., the benchmark code has been split into the new 
[PR-8368](https://github.com/apache/flink/pull/8368).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-08 Thread GitBox
walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-490712462
 
 
   @wuchong can you also help take a look at the upgrade? Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19

2019-05-08 Thread GitBox
walterddr commented on issue #8324: [FLINK-11921][table] Upgrade to calcite 1.19
URL: https://github.com/apache/flink/pull/8324#issuecomment-490712398
 
 
   @flinkbot attention @wuchong 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12392) FlinkRelMetadataQuery does not compile with Scala 2.12

2019-05-08 Thread Jark Wu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836018#comment-16836018
 ] 

Jark Wu commented on FLINK-12392:
-

Sure [~twalthr], we will figure it out today.

> FlinkRelMetadataQuery does not compile with Scala 2.12
> --
>
> Key: FLINK-12392
> URL: https://issues.apache.org/jira/browse/FLINK-12392
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0
>Reporter: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.9.0
>
>
> {code}
> 10:57:51.770 [ERROR] 
> /home/travis/build/apache/flink/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMetadataQuery.scala:52:
>  error: value EMPTY in class RelMetadataQuery cannot be accessed in object 
> org.apache.calcite.rel.metadata.RelMetadataQuery
> 10:57:51.770 [ERROR]  Access to protected value EMPTY not permitted because
> 10:57:51.770 [ERROR]  enclosing package metadata in package plan is not a 
> subclass of
> 10:57:51.770 [ERROR]  class RelMetadataQuery in package metadata where target 
> is defined
> 10:57:51.770 [ERROR] this(RelMetadataQuery.THREAD_PROVIDERS.get, 
> RelMetadataQuery.EMPTY)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] xuefuz commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on issue #8353: [FLINK-12233][hive] Support table related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#issuecomment-490709606
 
 
   > @xuefuz @lirui-apache thanks for your review! The build failure is 
unrelated (python api issue). If there's no more comment, I will merge tonight
   I think I will go thru one more time on this. Thanks.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#issuecomment-490695098
 
 
   Rebased to latest master to resolve conflicts. Will wait for a green build 
before proceeding


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12417) Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface

2019-05-08 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12417.

Resolution: Fixed

merged in 1.9.0: c954780c9fb2c0cc5b3730caff7e99bacbd534ba

> Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog 
> interface
> -
>
> Key: FLINK-12417
> URL: https://issues.apache.org/jira/browse/FLINK-12417
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> As discussed with [~dawidwys], the original purpose to separate 
> ReadableCatalog and ReadableWritableCatalog is to isolate access to metadata. 
> However, we believe access control and authorization is orthogonal to design 
> of catalog APIs and should be of a different effort.
> Thus, we propose to merge ReadableCatalog and ReadableWritableCatalog to 
> simplify the design.
> cc [~twalthr] [~xuefuz]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface

2019-05-08 Thread GitBox
asfgit closed pull request #8365: [FLINK-12417][table] Unify ReadableCatalog 
and ReadableWritableCatalog interfaces to Catalog interface
URL: https://github.com/apache/flink/pull/8365
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#issuecomment-490692748
 
 
   @xuefuz @lirui-apache thanks for your review! The build failure is unrelated 
(python api issue). If there's no more comment, I will merge tonight


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface

2019-05-08 Thread GitBox
bowenli86 commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog 
and ReadableWritableCatalog interfaces to Catalog interface
URL: https://github.com/apache/flink/pull/8365#issuecomment-490692552
 
 
   @xuefuz thanks for your review! ReadableCatalog.java is already deleted. 
   
   Merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface

2019-05-08 Thread GitBox
xuefuz commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and 
ReadableWritableCatalog interfaces to Catalog interface
URL: https://github.com/apache/flink/pull/8365#issuecomment-490680574
 
 
   Yes. Let's delete ReadableCatalog.java and the PR is good to go. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on issue #8353: [FLINK-12233][hive] Support table related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#issuecomment-490680873
 
 
   ok. sounds good. thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 edited a comment on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 edited a comment on issue #8353: [FLINK-12233][hive] Support table 
related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#issuecomment-490654662
 
 
   > Thanks for refactoring the code. It looks much cleaner now.
   > 
   > However, I still noticed there are still a lot of commonality between the 
two implementations of createHiveTable() and createCatalogTable(). Maybe we can 
share more code between them and reduce the code duplication.
   
   @xuefuz they currently look similar mainly because HiveCatalog's 
implementation is only preliminary at the moment as mentioned in comment. E.g. 
it hasn't consider input/output format and serdelib yet. As we enrich 
HiveCatalog, I expect they will diverge a lot. It would be better to try to 
unify their common parts in the end.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#issuecomment-490654662
 
 
   > Thanks for refactoring the code. It looks much cleaner now.
   > 
   > However, I still noticed there are still a lot of commonality between the 
two implementations of createHiveTable() and createCatalogTable(). Maybe we can 
share more code between them and reduce the code duplication.
   
   @xuefuz they currently look similar mainly because HiveCatalog's 
implementation is only preliminary at the moment as mentioned in comment. As we 
enrich HiveCatalog, I expect they will diverge a lot. It would be better to try 
to unify their common parts in the end.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282251157
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), newTable);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+ 

[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282251084
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   try {
 
 Review comment:
   In memory catalog also doesn't check that currently. We should solve that in 
"FLINK-12452: alterTable() in all catalogs should ensure existing base table 
and the new one are of the same type"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12452) alterTable() should ensure existing base table and the new one are of the same type

2019-05-08 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12452:


 Summary: alterTable() should ensure existing base table and the 
new one are of the same type
 Key: FLINK-12452
 URL: https://issues.apache.org/jira/browse/FLINK-12452
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive, Table SQL / API
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0


Currently all catalogs does check if existing base table and the new one are of 
the same type in alterTable(). We should add the check to all catalogs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282244476
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   public abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
+   throws IllegalArgumentException;
+
+   /**
+* Create a CatalogBaseTable from a Hive table.
+*
+* @param hiveTable a Hive table
+* @return a CatalogBaseTable
+*/
+   public abstract CatalogBaseTable createCatalogTable(Table hiveTable);
+
+   /**
+* Create a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table a CatalogBaseTable
+* @return a Hive table
+*/
+   public abstract Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table);
 
 Review comment:
   I feel "create" is better because it has more complexity than just simply 
convert one object to another and keep naming convention consistent. Also feels 
a bit weird to "convert" two things into one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282239896
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -363,4 +333,23 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(ObjectPath tablePath
throw new UnsupportedOperationException();
}
 
+   // -- utils --
+
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private Map retrieveFlinkProperties(Map 
hiveTableParams) {
+   return hiveTableParams.entrySet().stream()
+   .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+   .collect(Collectors.toMap(e -> 
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+   }
+
+   /**
+* Add a prefix to Flink-created properties to distinguish them from 
Hive-created properties.
+*/
+   private Map buildFlinkProperties(Map 
properties) {
 
 Review comment:
   I feel "mask" is a good name. Not sure about "unmask" because the retrieval 
not only unmasks flink properties but also filters out hive properties.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282238893
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
// -- tables and views--
 
@Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   try {
-   client.dropTable(
-   tablePath.getDatabaseName(),
-   tablePath.getObjectName(),
-   // Indicate whether associated data should be 
deleted.
-   // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
-   true,
-   ignoreIfNotExists);
-   } catch (NoSuchObjectException e) {
-   if (!ignoreIfNotExists) {
-   throw new TableNotExistException(catalogName, 
tablePath);
-   }
-   } catch (TException e) {
-   throw new CatalogException(
-   String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+   public void validateCatalogBaseTable(CatalogBaseTable table) throws 
IllegalArgumentException {
+   // TODO: invalidate HiveCatalogView
+   if (table instanceof HiveCatalogTable) {
+   throw new IllegalArgumentException(
+   "Please use HiveCatalog to operate on 
HiveCatalogTable and HiveCatalogView.");
}
}
 
@Override
-   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-   try {
-   // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
-   if (tableExists(tablePath)) {
-   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
-   // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
-   if (tableExists(newPath)) {
-   throw new 
TableAlreadyExistException(catalogName, newPath);
-   } else {
-   Table table = getHiveTable(tablePath);
-   table.setTableName(newTableName);
-   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
-   }
-   } else if (!ignoreIfNotExists) {
-   throw new TableNotExistException(catalogName, 
tablePath);
-   }
-   } catch (TException e) {
-   throw new CatalogException(
-   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
-   }
-   }
+   public CatalogBaseTable createCatalogTable(Table hiveTable) {
+   // Table schema
+   TableSchema tableSchema = createTableSchema(
+   hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
 
-   @Override
-   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-   if (!databaseExists(tablePath.getDatabaseName())) {
-   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
-   } else {
-   try {
-   
client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, 
table));
-   } catch (AlreadyExistsException e) {
-   if (!ignoreIfExists) {
-   throw new 
TableAlreadyExistException(catalogName, tablePath);
-   }
-   } catch (TException e) {
-   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
-   }
+   // Table properties
+   Map 

[GitHub] [flink] bowenli86 commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface

2019-05-08 Thread GitBox
bowenli86 commented on issue #8365: [FLINK-12417][table] Unify ReadableCatalog 
and ReadableWritableCatalog interfaces to Catalog interface
URL: https://github.com/apache/flink/pull/8365#issuecomment-490639242
 
 
   > Changes look good to me. However, I wonder if we should get ride of 
ReadableCatalog class.
   
   Thanks for your review. I think in the future there might be a 
ReadOnlyCatalog or similar interfaces. We can create that when necessary, and I 
don't think we need it right now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates

2019-05-08 Thread GitBox
StefanRRichter commented on a change in pull request #8361: 
[FLINK-12434][network] Replace listeners with CompletableFuture in InputGates
URL: https://github.com/apache/flink/pull/8361#discussion_r282212329
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/SetArrayDeque.java
 ##
 @@ -16,20 +16,38 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.consumer;
+package org.apache.flink.runtime.util;
+
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
- * Listener interface implemented by consumers of {@link InputGate} instances
- * that want to be notified of availability of buffer or event instances.
+ * Wrapper of {@link ArrayDeque} with quick {@link HashSet}'s based {@link 
#contains(Object)} method.
  */
-public interface InputGateListener {
+public class SetArrayDeque {
+   private final ArrayDeque deque = new ArrayDeque<>();
+   private final Set set = new HashSet<>();
+
+   public boolean add(T element) {
+   return deque.add(element) || set.add(element);
 
 Review comment:
   It is not only about the return value, but even worse you can add duplicates 
to the dequeue because it is ignored that the value is potentially already in 
the set. So this is not set semantics anymore.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StefanRRichter commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates

2019-05-08 Thread GitBox
StefanRRichter commented on a change in pull request #8361: 
[FLINK-12434][network] Replace listeners with CompletableFuture in InputGates
URL: https://github.com/apache/flink/pull/8361#discussion_r282213460
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -596,6 +600,12 @@ public void requestPartitions() throws IOException, 
InterruptedException {
}
}
 
+   private void resetIsAvailable() {
+   if (isAvailable.isDone()) {
 
 Review comment:
   Yes, the `||` version makes sense. The suggestion was just a shortcut 
version.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282206462
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   try {
+   if (!tableExists(tablePath)) {
+   if (!ignoreIfNotExists) {
+   throw new 
TableNotExistException(catalogName, tablePath);
+   }
+   } else {
+   Table newTable = createHiveTable(tablePath, 
newCatalogTable);
+
+   // client.alter_table() requires a valid 
location
+   // thus, if new table doesn't have that, it 
reuses location of the old table
+   if (!newTable.getSd().isSetLocation()) {
+   Table oldTable = 
getHiveTable(tablePath);
+   
newTable.getSd().setLocation(oldTable.getSd().getLocation());
+   }
+
+   client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), newTable);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282205173
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   try {
 
 Review comment:
   Do we need to validate newCatalogTable?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282203125
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   public abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
 
 Review comment:
   Use "protected" if we don't expect external callers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282205441
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -197,4 +237,193 @@ protected void alterHiveDatabase(String name, Database 
newHiveDatabase, boolean
throw new CatalogException(String.format("Failed to 
alter database %s", name), e);
}
}
+
+   // -- tables --
+
+   @Override
+   public CatalogBaseTable getTable(ObjectPath tablePath)
+   throws TableNotExistException, CatalogException {
+   return createCatalogTable(getHiveTable(tablePath));
+   }
+
+   @Override
+   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
+   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
+   validateCatalogBaseTable(table);
+
+   if (!databaseExists(tablePath.getDatabaseName())) {
+   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
+   } else {
+   try {
+   client.createTable(createHiveTable(tablePath, 
table));
+   } catch (AlreadyExistsException e) {
+   if (!ignoreIfExists) {
+   throw new 
TableAlreadyExistException(catalogName, tablePath);
+   }
+   } catch (TException e) {
+   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
+   }
+   }
+   }
+
+   @Override
+   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
+   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
+   try {
+   // alter_table() doesn't throw a clear exception when 
target table doesn't exist.
+   // Thus, check the table existence explicitly
+   if (tableExists(tablePath)) {
+   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
+   // alter_table() doesn't throw a clear 
exception when new table already exists.
+   // Thus, check the table existence explicitly
+   if (tableExists(newPath)) {
+   throw new 
TableAlreadyExistException(catalogName, newPath);
+   } else {
+   Table table = getHiveTable(tablePath);
+   table.setTableName(newTableName);
+   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
+   }
+   } else if (!ignoreIfNotExists) {
+   throw new TableNotExistException(catalogName, 
tablePath);
+   }
+   } catch (TException e) {
+   throw new CatalogException(
+   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
+   }
+   }
+
+   @Override
+   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists)
+   throws TableNotExistException, CatalogException {
+   try {
 
 Review comment:
   Also, how do we ensure no alter for table to view and vise versa?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282203601
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   public abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
+   throws IllegalArgumentException;
+
+   /**
+* Create a CatalogBaseTable from a Hive table.
+*
+* @param hiveTable a Hive table
+* @return a CatalogBaseTable
+*/
+   public abstract CatalogBaseTable createCatalogTable(Table hiveTable);
+
+   /**
+* Create a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table a CatalogBaseTable
+* @return a Hive table
+*/
+   public abstract Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table);
 
 Review comment:
   Same as above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282203452
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 ##
 @@ -89,6 +101,34 @@ private static IMetaStoreClient 
getMetastoreClient(HiveConf hiveConf) {
}
}
 
+   // -- APIs --
+
+   /**
+* Validate input base table.
+*
+* @param catalogBaseTable the base table to be validated
+* @throws IllegalArgumentException thrown if the input base table is 
invalid.
+*/
+   public abstract void validateCatalogBaseTable(CatalogBaseTable 
catalogBaseTable)
+   throws IllegalArgumentException;
+
+   /**
+* Create a CatalogBaseTable from a Hive table.
+*
+* @param hiveTable a Hive table
+* @return a CatalogBaseTable
+*/
+   public abstract CatalogBaseTable createCatalogTable(Table hiveTable);
 
 Review comment:
   Can we use word "convertTo" instead of "create"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282201837
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -363,4 +333,23 @@ public CatalogColumnStatistics 
getPartitionColumnStatistics(ObjectPath tablePath
throw new UnsupportedOperationException();
}
 
+   // -- utils --
+
+   /**
+* Filter out Hive-created properties, and return Flink-created 
properties.
+*/
+   private Map retrieveFlinkProperties(Map 
hiveTableParams) {
+   return hiveTableParams.entrySet().stream()
+   .filter(e -> 
e.getKey().startsWith(FLINK_PROPERTY_PREFIX))
+   .collect(Collectors.toMap(e -> 
e.getKey().replace(FLINK_PROPERTY_PREFIX, ""), e -> e.getValue()));
+   }
+
+   /**
+* Add a prefix to Flink-created properties to distinguish them from 
Hive-created properties.
+*/
+   private Map buildFlinkProperties(Map 
properties) {
 
 Review comment:
   Maybe we can use words "mask" and "unmask" instead of "build" and "retrieve".


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282199916
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
// -- tables and views--
 
@Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   try {
-   client.dropTable(
-   tablePath.getDatabaseName(),
-   tablePath.getObjectName(),
-   // Indicate whether associated data should be 
deleted.
-   // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
-   true,
-   ignoreIfNotExists);
-   } catch (NoSuchObjectException e) {
-   if (!ignoreIfNotExists) {
-   throw new TableNotExistException(catalogName, 
tablePath);
-   }
-   } catch (TException e) {
-   throw new CatalogException(
-   String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+   public void validateCatalogBaseTable(CatalogBaseTable table) throws 
IllegalArgumentException {
+   // TODO: invalidate HiveCatalogView
+   if (table instanceof HiveCatalogTable) {
+   throw new IllegalArgumentException(
+   "Please use HiveCatalog to operate on 
HiveCatalogTable and HiveCatalogView.");
}
}
 
@Override
-   public void renameTable(ObjectPath tablePath, String newTableName, 
boolean ignoreIfNotExists)
-   throws TableNotExistException, 
TableAlreadyExistException, CatalogException {
-   try {
-   // alter_table() doesn't throw a clear exception when 
target table doesn't exist. Thus, check the table existence explicitly
-   if (tableExists(tablePath)) {
-   ObjectPath newPath = new 
ObjectPath(tablePath.getDatabaseName(), newTableName);
-   // alter_table() doesn't throw a clear 
exception when new table already exists. Thus, check the table existence 
explicitly
-   if (tableExists(newPath)) {
-   throw new 
TableAlreadyExistException(catalogName, newPath);
-   } else {
-   Table table = getHiveTable(tablePath);
-   table.setTableName(newTableName);
-   
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), 
table);
-   }
-   } else if (!ignoreIfNotExists) {
-   throw new TableNotExistException(catalogName, 
tablePath);
-   }
-   } catch (TException e) {
-   throw new CatalogException(
-   String.format("Failed to rename table %s", 
tablePath.getFullName()), e);
-   }
-   }
+   public CatalogBaseTable createCatalogTable(Table hiveTable) {
+   // Table schema
+   TableSchema tableSchema = createTableSchema(
+   hiveTable.getSd().getCols(), 
hiveTable.getPartitionKeys());
 
-   @Override
-   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-   throws TableAlreadyExistException, 
DatabaseNotExistException, CatalogException {
-   if (!databaseExists(tablePath.getDatabaseName())) {
-   throw new DatabaseNotExistException(catalogName, 
tablePath.getDatabaseName());
-   } else {
-   try {
-   
client.createTable(GenericHiveMetastoreCatalogUtil.createHiveTable(tablePath, 
table));
-   } catch (AlreadyExistsException e) {
-   if (!ignoreIfExists) {
-   throw new 
TableAlreadyExistException(catalogName, tablePath);
-   }
-   } catch (TException e) {
-   throw new 
CatalogException(String.format("Failed to create table %s", 
tablePath.getFullName()), e);
-   }
+   // Table properties
+   Map properties = 

[GitHub] [flink] xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
xuefuz commented on a change in pull request #8353: [FLINK-12233][hive] Support 
table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282198787
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -95,144 +106,103 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
// -- tables and views--
 
@Override
-   public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
-   throws TableNotExistException, CatalogException {
-   try {
-   client.dropTable(
-   tablePath.getDatabaseName(),
-   tablePath.getObjectName(),
-   // Indicate whether associated data should be 
deleted.
-   // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
-   true,
-   ignoreIfNotExists);
-   } catch (NoSuchObjectException e) {
-   if (!ignoreIfNotExists) {
-   throw new TableNotExistException(catalogName, 
tablePath);
-   }
-   } catch (TException e) {
-   throw new CatalogException(
-   String.format("Failed to drop table %s", 
tablePath.getFullName()), e);
+   public void validateCatalogBaseTable(CatalogBaseTable table) throws 
IllegalArgumentException {
+   // TODO: invalidate HiveCatalogView
+   if (table instanceof HiveCatalogTable) {
 
 Review comment:
   I'm wondering if we should specify what we are expecting and throw 
exceptions if condition isn't met.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes

2019-05-08 Thread Henrik (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835800#comment-16835800
 ] 

Henrik edited comment on FLINK-12381 at 5/8/19 6:34 PM:


Yes, you can see it like that (a new cluster), I suppose.

So does that mean that flink is useless without HA then? Because if I don't 
have HA, and the node I'm running it on, or the k8s pod I'm running it in, 
restarts, it's a new cluster?

In the optimal world, I would not have to manually change the specification of 
the job that runs, without the job that runs also having been changed. I.e. it 
goes against declarative running of resources in a k8s cluster to manually have 
to change the jobid whenever the pod is restarted.


was (Author: haf):
Yes, you can see it like that (a new cluster), I suppose.

So does that mean that flink is useless without HA then? Because if I don't 
have HA, and the node I'm running it on, or the k8s pod I'm running it in, 
restarts, it's a new cluster?

> W/o HA, upon a full restart, checkpointing crashes
> --
>
> Key: FLINK-12381
> URL: https://issues.apache.org/jira/browse/FLINK-12381
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.8.0
> Environment: Same as FLINK-\{12379, 12377, 12376}
>Reporter: Henrik
>Priority: Major
>
> {code:java}
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> 'gs://example_bucket/flink/checkpoints//chk-16/_metadata'
>  already exists
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
>     at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
>     ... 8 more
> {code}
> Instead, it should either just overwrite the checkpoint or fail to start the 
> job completely. Partial and undefined failure is not what should happen.
>  
> Repro:
>  # Set up a single purpose job cluster (which could use much better docs btw!)
>  # Let it run with GCS checkpointing for a while with rocksdb/gs://example
>  # Kill it
>  # Start it



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12379) Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint

2019-05-08 Thread Henrik (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835505#comment-16835505
 ] 

Henrik edited comment on FLINK-12379 at 5/8/19 6:29 PM:


Yes, it's the "standalone-job.sh" entrypoint. [~StephanEwen]

AFAIK it's the recommended way to deploy a standalone job?


was (Author: haf):
Yes, it's the "standalone-job.sh" entrypoint. [~StephanEwen]

> Parallelism in job/GCS/Hadoop: Could not finalize the pending checkpoint
> 
>
> Key: FLINK-12379
> URL: https://issues.apache.org/jira/browse/FLINK-12379
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems, Runtime / Coordination
>Affects Versions: 1.8.0
> Environment: GCS +
>  
> {code:java}
> 1.8.0
> 1.8
> 2.11{code}
> {code:java}
> 
> 
> 
> 
>   com.google.cloud.bigdataoss
>   gcs-connector
>   hadoop2-1.9.16
> 
> 
>   org.apache.flink
>   flink-connector-filesystem_2.11
>   ${flink.version}
> 
> 
>   org.apache.flink
>   flink-hadoop-fs
>   ${flink.version}
> 
> 
> 
>   org.apache.flink
>   flink-shaded-hadoop2
>   ${hadoop.version}-${flink.version}
> 
> {code}
>  
>  
>Reporter: Henrik
>Priority: Major
>
> When running one standalone-job w/ parallelism=1 + one taskmanager, you will 
> shortly get this crash
> {code:java}
> 2019-04-30 22:20:02,928 WARN  org.apache.flink.runtime.jobmaster.JobMaster
>   - Error while processing checkpoint acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 5.
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:837)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> 'gs://example_bucket/flink/checkpoints//chk-5/_metadata'
>  already exists
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
>     at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
>     ... 8 more
> Caused by: java.nio.file.FileAlreadyExistsException: Object 
> gs://example_bucket/flink/checkpoints//chk-5/_metadata
>  already exists.
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.getWriteGeneration(GoogleCloudStorageImpl.java:1918)
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.create(GoogleCloudStorageImpl.java:410)
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.createInternal(GoogleCloudStorageFileSystem.java:286)
>     at 
> com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.create(GoogleCloudStorageFileSystem.java:264)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:82)
>     ... 19 more
> 2019-04-30 22:20:03,114 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 6 @ 

[jira] [Commented] (FLINK-12381) W/o HA, upon a full restart, checkpointing crashes

2019-05-08 Thread Henrik (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835800#comment-16835800
 ] 

Henrik commented on FLINK-12381:


Yes, you can see it like that (a new cluster), I suppose.

So does that mean that flink is useless without HA then? Because if I don't 
have HA, and the node I'm running it on, or the k8s pod I'm running it in, 
restarts, it's a new cluster?

> W/o HA, upon a full restart, checkpointing crashes
> --
>
> Key: FLINK-12381
> URL: https://issues.apache.org/jira/browse/FLINK-12381
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Coordination
>Affects Versions: 1.8.0
> Environment: Same as FLINK-\{12379, 12377, 12376}
>Reporter: Henrik
>Priority: Major
>
> {code:java}
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> 'gs://example_bucket/flink/checkpoints//chk-16/_metadata'
>  already exists
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.createChannel(GoogleHadoopOutputStream.java:85)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopOutputStream.(GoogleHadoopOutputStream.java:74)
>     at 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.create(GoogleHadoopFileSystemBase.java:797)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:807)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:141)
>     at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:37)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.(FsCheckpointMetadataOutputStream.java:65)
>     at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:104)
>     at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:259)
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:829)
>     ... 8 more
> {code}
> Instead, it should either just overwrite the checkpoint or fail to start the 
> job completely. Partial and undefined failure is not what should happen.
>  
> Repro:
>  # Set up a single purpose job cluster (which could use much better docs btw!)
>  # Let it run with GCS checkpointing for a while with rocksdb/gs://example
>  # Kill it
>  # Start it



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] kisimple commented on issue #7757: [FLINK-11630] Triggers the termination of all running Tasks when shutting down TaskExecutor

2019-05-08 Thread GitBox
kisimple commented on issue #7757: [FLINK-11630] Triggers the termination of 
all running Tasks when shutting down TaskExecutor
URL: https://github.com/apache/flink/pull/7757#issuecomment-490593540
 
 
   Hi @azagrebin Sorry for the delay, I have addressed your comments, and I 
haven't reproduced the original issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on a change in pull request #8353: [FLINK-12233][hive] 
Support table related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#discussion_r282181650
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -42,23 +46,35 @@
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * A catalog that persists all Flink streaming and batch metadata by using 
Hive metastore as a persistent storage.
  */
 public class GenericHiveMetastoreCatalog extends HiveCatalogBase {
private static final Logger LOG = 
LoggerFactory.getLogger(GenericHiveMetastoreCatalog.class);
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
 
 Review comment:
   You are right, I verified it locally. I think using Hive managed tables 
would be fine here. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related operations in HiveCatalog

2019-05-08 Thread GitBox
bowenli86 commented on issue #8353: [FLINK-12233][hive] Support table related 
operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8353#issuecomment-490590162
 
 
   @xuefuz @lirui-apache thanks for your review. please take another look.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate Savepoints page into Chinese

2019-05-08 Thread GitBox
XuQianJin-Stars commented on issue #8300: [FLINK-11638][docs-zh] Translate 
Savepoints page into Chinese
URL: https://github.com/apache/flink/pull/8300#issuecomment-490584204
 
 
   hi @klion26 Thank you for your review. I have modified it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10455) Potential Kafka producer leak in case of failures

2019-05-08 Thread Chris Slotterback (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835749#comment-16835749
 ] 

Chris Slotterback commented on FLINK-10455:
---

The way I was able to reproduce this was forcing the job into a failed state by 
timing out writing a checkpoint on the filesystem. We occasionally see latency 
spikes in our env resulting in these job restarts. Most of the jobs are able to 
recover fine, but when using exactly once for the kafka producer the job gets 
stuck in this loop. My assumption is any job failure will reach the client 
processDisconnect method after the class loader is gone.

> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12448) FlinkKafkaProducer late closure after class loader

2019-05-08 Thread Chris Slotterback (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Slotterback closed FLINK-12448.
-
Resolution: Duplicate

> FlinkKafkaProducer late closure after class loader
> --
>
> Key: FLINK-12448
> URL: https://issues.apache.org/jira/browse/FLINK-12448
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Chris Slotterback
>Priority: Major
>
> During job failure/restart, FlinkKafkaProducer configured with 
> Semantic.EXACTLY_ONCE fails to disconnect properly do to a 
> NoClassDefFoundError:
> {noformat}
> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
> at 
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:658)
> at 
> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:805)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:520)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:226)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> at java.lang.Thread.run(Thread.java:748){noformat}
>  
> This begins a restart loop where the job never recovers properly. This is 
> reproducible only with EXACTLY_ONCE semantic, AT_LEAST_ONCE properly 
> disconnects and restarts without error.
> This issue is described in FLINK-10455, but has been since marked as Fixed, 
> but still reproducible.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12451) [Bitwise Functions] Add BIT_XOR function supported in Table API and SQL

2019-05-08 Thread Zhanchun Zhang (JIRA)
Zhanchun Zhang created FLINK-12451:
--

 Summary: [Bitwise Functions] Add BIT_XOR function supported in 
Table API and SQL
 Key: FLINK-12451
 URL: https://issues.apache.org/jira/browse/FLINK-12451
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhanchun Zhang
Assignee: Zhanchun Zhang


Bitwise XOR, returns an unsigned 64-bit integer.

eg:
SELECT 1 ^ 1; returns 0

SELECT 1 ^ 0; returns 1

SELECT 11 ^ 3; returns 8



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12450) [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions supported in Table API and SQL

2019-05-08 Thread Zhanchun Zhang (JIRA)
Zhanchun Zhang created FLINK-12450:
--

 Summary: [Bitwise Functions] Add BIT_LSHIFT, BIT_RSHIFT functions 
supported in Table API and SQL
 Key: FLINK-12450
 URL: https://issues.apache.org/jira/browse/FLINK-12450
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhanchun Zhang
Assignee: Zhanchun Zhang


BIT_LSHIFT, Shifts a long number to the left

BIT_RSHIFT, Shifts a long number to the right



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12449) [Bitwise Functions] Add BIT_AND, BIT_OR functions supported in Table API and SQL

2019-05-08 Thread Zhanchun Zhang (JIRA)
Zhanchun Zhang created FLINK-12449:
--

 Summary: [Bitwise Functions] Add BIT_AND,  BIT_OR functions 
supported in Table API and SQL
 Key: FLINK-12449
 URL: https://issues.apache.org/jira/browse/FLINK-12449
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Zhanchun Zhang
Assignee: Zhanchun Zhang


Bitwise AND.

eg. SELECT BIT_AND(29,15), returns 13 

Bitwise OR

eg. SELECT BIT_OR(29 ,15), returns 31




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-08 Thread GitBox
zhijiangW commented on issue #8242: [FLINK-6227][network] Introduce the 
DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#issuecomment-490529494
 
 
   Thanks for the further review @tillrohrmann . Let me try to explain above 
two concerns.
   
   - Whether to always transform `IOException` to `PartitionNotFoundException`? 
Actually I would like to do this wrapper in the construtor of 
`SpilledSubpartitionView` which might cause `IOException` during opening disk 
file. And based on the current codes, this is the only process which might 
cause `IOExeption` during creating `ResultSubpartitionView`. But considering 
the `SpillableSubpartitoin` and its reader view would be removed by stephan's 
new `BoundedBlockingSubpartition` soon, so I do this wrapper as the way in PR 
which might also suitable for the new `BoundedBlockingSubpartition`. But in 
strict way it would   be better to transform in the specific process instead of 
covering the whole process. If you have concerns on this, I could adjust it to 
the internal process even though it would be removed soon. Or we do not focus 
on the case `c` until `BoundedBlockingSubpartition` is merged.
   
   - Whether to check file exist on producer side and throw 
`PartitionNotFoundException` there?  I think the check could only be done on 
producer side during opening the file. Maybe there are other options but I have 
not thought of now. But the producer might not throw 
`PartitionNotFoundException` after checking file nonexist. Another option is 
the producer throws another special exception called `PartitionOpenException`, 
then it would be sent to consumer via `ErrorResponse` in network. The consumer 
would fail directly and wrap it into `PartitionNotFoundException` for JM. To do 
so we could avoid the process of checking partition state on consumer side when 
receiving `PartitionNotFoundException`. But we need another new exception 
defination. My current way in PR is easy and actually for blocking partition 
the consumer could also fail directly after receiving 
`PartitionNotFoundException` as I mentioned before.
   
   I could create a ticket jira for the connection issue later.
   
   The unit tests I have not focused yet, because it is related to the process 
we want to implement. After you approval the current way, I would fix the 
existing tests and add new tests to cover the cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10455) Potential Kafka producer leak in case of failures

2019-05-08 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10455:
--
Fix Version/s: (was: 1.6.3)
   1.8.1
   1.9.0
   1.7.3

> Potential Kafka producer leak in case of failures
> -
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] pnowojski commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates

2019-05-08 Thread GitBox
pnowojski commented on a change in pull request #8361: [FLINK-12434][network] 
Replace listeners with CompletableFuture in InputGates
URL: https://github.com/apache/flink/pull/8361#discussion_r28202
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/SetArrayDeque.java
 ##
 @@ -16,20 +16,38 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.io.network.partition.consumer;
+package org.apache.flink.runtime.util;
+
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
- * Listener interface implemented by consumers of {@link InputGate} instances
- * that want to be notified of availability of buffer or event instances.
+ * Wrapper of {@link ArrayDeque} with quick {@link HashSet}'s based {@link 
#contains(Object)} method.
  */
-public interface InputGateListener {
+public class SetArrayDeque {
+   private final ArrayDeque deque = new ArrayDeque<>();
+   private final Set set = new HashSet<>();
+
+   public boolean add(T element) {
+   return deque.add(element) || set.add(element);
 
 Review comment:
   I can change it to `void add(T)` ;) But ok, I will add the unit test for 
this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates

2019-05-08 Thread GitBox
pnowojski commented on a change in pull request #8361: [FLINK-12434][network] 
Replace listeners with CompletableFuture in InputGates
URL: https://github.com/apache/flink/pull/8361#discussion_r282110599
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ##
 @@ -197,32 +197,49 @@ public void requestPartitions() throws IOException, 
InterruptedException {
return Optional.of(bufferOrEvent);
}
 
-   @Override
-   public Optional pollNextBufferOrEvent() throws 
UnsupportedOperationException {
-   throw new UnsupportedOperationException();
-   }
-
-   private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
+   private Optional waitAndGetNextInputGate(boolean 
blocking) throws IOException, InterruptedException {
while (true) {
-   InputGate inputGate;
-   boolean moreInputGatesAvailable;
synchronized (inputGatesWithData) {
while (inputGatesWithData.size() == 0) {
-   inputGatesWithData.wait();
+   if (blocking) {
+   inputGatesWithData.wait();
+   } else {
+   resetIsAvailable();
+   return Optional.empty();
+   }
+   }
+   final InputGate inputGate = 
inputGatesWithData.remove();
+
+   // In case of inputGatesWithData being 
inaccurate do not block on an empty inputGate, but just poll the data.
+   Optional bufferOrEvent = 
inputGate.pollNextBufferOrEvent();
+
+   if (bufferOrEvent.isPresent() && 
bufferOrEvent.get().moreAvailable()) {
+   // enqueue the inputGate at the end to 
avoid starvation
+   inputGatesWithData.add(inputGate);
+   } else {
+   inputGate.isAvailable().thenRun(() -> 
queueInputGate(inputGate));
 
 Review comment:
   Yes, but this is not an issue here, since if `inputGate.isAvailable() == 
AVAILABLE` it would mean that `bufferOrEvent.get().moreAvailable()` returns 
true - so when hot looping we do not enter this code path.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >