[GitHub] [flink] dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-05-02 Thread GitBox
dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280663142
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -215,6 +218,74 @@ abstract class TableEnvironment(val config: TableConfig) {
 rels.map(_.asInstanceOf[ExecNode[_, _]])
   }
 
+  /**
+* Register an [[ReadableCatalog]] under a unique name.
+*
+* @param name the name under which the catalog will be registered
+* @param catalog the catalog to register
+* @throws CatalogAlreadyExistsException thrown if the catalog already 
exists
+*/
+  @throws[CatalogAlreadyExistsException]
+  def registerCatalog(name: String, catalog: ReadableCatalog): Unit = {
+catalogManager.registerCatalog(name, catalog)
+  }
+
+  /**
+* Get a registered [[ReadableCatalog]].
+*
+* @param catalogName name of the catalog to get
+* @return the requested catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+*/
+  @throws[CatalogNotExistException]
+  def getCatalog(catalogName: String): ReadableCatalog = {
+catalogManager.getCatalog(catalogName)
+  }
+
+  /**
+* Get the current catalog.
+*
+* @return the current catalog in CatalogManager
+*/
+  def getCurrentCatalog(): ReadableCatalog = {
+catalogManager.getCurrentCatalog
+  }
+
+  /**
+* Get the current database name.
+*
+* @return the current database of the current catalog
+*/
+  def getCurrentDatabaseName(): String = {
+catalogManager.getCurrentCatalog.getCurrentDatabase
+  }
+
+  /**
+* Set the current catalog.
+*
+* @param name name of the catalog to set as current catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+*/
+  @throws[CatalogNotExistException]
+  def setCurrentCatalog(name: String): Unit = {
+catalogManager.setCurrentCatalog(name)
+  }
+
+  /**
+* Set the current catalog and current database.
+*
+* @param catalogName name of the catalog to set as current catalog
+* @param databaseName name of the database to set as current database
+* @throws CatalogNotExistException  thrown if the catalog doesn't exist
+* @throws DatabaseNotExistException thrown if the database doesn't exist
+*/
+  @throws[CatalogNotExistException]
+  @throws[DatabaseNotExistException]
+  def setCurrentDatabase(catalogName: String, databaseName: String): Unit = {
+catalogManager.setCurrentCatalog(catalogName)
+catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName)
 
 Review comment:
   There can only be a single current database per session, not a one current 
dB per catalog. So we don't need a `Map` in `CatalogManager`.
   Have you checked what SQL standard says about it? As I understand it and 
checked it there is always a default prefix per session (either catalog or 
catalog + schema - equivalent to DB), so a current DB is not set per catalog.
   
   What if the same catalog is used by multiple users for different sessions? 
How do you retrieve the original default DB for a new session? As I said 
originally, this is a property of a session and I strongly believe it should 
not be stored in the catalog. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-02 Thread GitBox
flinkbot commented on issue #8337: [FLINK-12232][hive] Create HiveCatalog and 
support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337#issuecomment-488921461
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 opened a new pull request #8337: [FLINK-12232][hive] Create HiveCatalog and support database related operations in HiveCatalog

2019-05-02 Thread GitBox
bowenli86 opened a new pull request #8337: [FLINK-12232][hive] Create 
HiveCatalog and support database related operations in HiveCatalog
URL: https://github.com/apache/flink/pull/8337
 
 
   ## What is the purpose of the change
   
   This PR creates HiveCatalog in flink-connector-hive module and implements 
database related APIs for HiveCatalog.
   
   ## Brief change log
   
 - Created HiveCatalog and HiveCatalogDatabase
 - Abstracted common part between HiveCatalog and 
GenericHiveMetastoreCatalog to HiveCatalogBase
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Added tests HiveCatalogTest
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   
   Documentation will be added later


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12232) Support database related operations in HiveCatalog

2019-05-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12232:
---
Labels: pull-request-available  (was: )

> Support database related operations in HiveCatalog
> --
>
> Key: FLINK-12232
> URL: https://issues.apache.org/jira/browse/FLINK-12232
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Support database related operations in HiveCatalog, which implements 
> ReadableWritableCatalog API



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280604389
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -229,31 +231,97 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   client.dropTable(
+   tablePath.getDatabaseName(),
+   tablePath.getObjectName(),
+   // Indicate whether associated data should be 
deleted.
+   // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
 
 Review comment:
   yeah, later when necessary. Along with 'purge'


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12395) Reconcile description and detailed description in CatalogBaseTable

2019-05-02 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-12395:
-
Description: 
CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} API, 
which don't seem to make such sense. I'm not sure what's the use case of 
detailed description, and how should users specify it in SQL and table api. 
Probably should remove detailed description and rename "description" to 
"comment".

Besides, for simplicity, we should consider just treating "comment" as a 
property in properties map.


  was:
CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} API, 
which don't seem to make such sense. I'm not sure what's the use case of 
detailed description, and how should users specify it in SQL and table api.

Probably should remove detailed description and treat description as comment.


> Reconcile description and detailed description in CatalogBaseTable
> --
>
> Key: FLINK-12395
> URL: https://issues.apache.org/jira/browse/FLINK-12395
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Bowen Li
>Priority: Major
> Fix For: 1.9.0
>
>
> CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} 
> API, which don't seem to make such sense. I'm not sure what's the use case of 
> detailed description, and how should users specify it in SQL and table api. 
> Probably should remove detailed description and rename "description" to 
> "comment".
> Besides, for simplicity, we should consider just treating "comment" as a 
> property in properties map.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280606701
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -41,28 +41,28 @@
private final List partitionKeys;
// Properties of the table
private final Map properties;
-   // Comment of the table
-   private String comment = "This is a generic catalog table.";
+   // Description of the table
+   private String description = "This is a generic catalog table.";
 
 Review comment:
   reverted. https://issues.apache.org/jira/browse/FLINK-12395


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280606701
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -41,28 +41,28 @@
private final List partitionKeys;
// Properties of the table
private final Map properties;
-   // Comment of the table
-   private String comment = "This is a generic catalog table.";
+   // Description of the table
+   private String description = "This is a generic catalog table.";
 
 Review comment:
   reverted https://issues.apache.org/jira/browse/FLINK-12395


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12395) Reconcile description and detailed description in CatalogBaseTable

2019-05-02 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12395:


 Summary: Reconcile description and detailed description in 
CatalogBaseTable
 Key: FLINK-12395
 URL: https://issues.apache.org/jira/browse/FLINK-12395
 Project: Flink
  Issue Type: Sub-task
Reporter: Bowen Li
 Fix For: 1.9.0


CatalogBaseTable has {{getDescritpion()}} and {{getDetailedDescription()}} API, 
which don't seem to make such sense. I'm not sure what's the use case of 
detailed description, and how should users specify it in SQL and table api.

Probably should remove detailed description and treat description as comment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8329: [FLINK-12239][hive] 
Support table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280604389
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -229,31 +231,97 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   client.dropTable(
+   tablePath.getDatabaseName(),
+   tablePath.getObjectName(),
+   // Indicate whether associated data should be 
deleted.
+   // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
 
 Review comment:
   agree, along with 'purge'


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280525021
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FlinkCatalogManager.java
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager implementation for Flink.
+ * TODO: [FLINK-11275] Decouple CatalogManager with Calcite
+ *   Idealy FlinkCatalogManager should be in flink-table-api-java module.
+ *   But due to that it currently depends on Calcite, a dependency that 
flink-table-api-java doesn't have right now.
+ *   We temporarily put FlinkCatalogManager in flink-table-planner-blink.
+ */
+public class FlinkCatalogManager implements CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCatalogManager.class);
+
+   public static final String BUILTIN_CATALOG_NAME = "builtin";
+
+   // The catalog to hold all registered and translated tables
+   // We disable caching here to prevent side effects
+   private CalciteSchema internalSchema = 
CalciteSchema.createRootSchema(false, false);
+   private SchemaPlus rootSchema = internalSchema.plus();
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   public FlinkCatalogManager() {
+   LOG.info("Initializing FlinkCatalogManager");
+   catalogs = new HashMap<>();
+
+   GenericInMemoryCatalog inMemoryCatalog = new 
GenericInMemoryCatalog(BUILTIN_CATALOG_NAME);
 
 Review comment:
   Good question, this PR only partially reflects the integration of 
`CatalogManager` and `TableEnv`. We propose adding new APIs to `TableEnv` to 
read/write catalog metadata (which will be a different set of JIRAs). If users 
try to write to a `ReadableCatalog`, a catalog exception will be thrown to 
remind them that. E.g. for CatalogBaseTable, we'll have
   
   ```
   // can have overloaded methods or a different signature to register table in 
a catalog different than the current one
   void registerTable(String name, CatalogBaseTable table, boolean 
ignoreIfExists) {
  if (!(catalogManager.getCurrentCatalog() instanceof 
ReadableWritableCatalog)) {
throw CatalogException("xxx is a ReadableCatalog only")
  } 
  catalog = (ReadableWritableCatalog) catalogManager.getCurrentCatalog()
  catalog.createTable(new ObjectPath(catalog.getCurrentDb(), name), table, 
ignoreIfExists)
   }
   ```
   
   `ReadableCatalog` and `ReadableWritableCatalog` is inherited from the 
original `ExternalCatalog` and `CrudExternalCatalog`, more context in 
[FLIP-30](https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs).
   
   I believe it's good to have `ReadableCatalog`. It sometimes doesn't make 
sense or just cannot support writing in some catalog impl. E.g. at least for 
now I don't see  Confluent Registry catalog ( 
[FLINK-12256](https://issues.apache.org/jira/browse/FLINK-12256)) needs to 
support writing since writing thru Confluent Registry itself and its fancy UI 
is much more convenient and user friendly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please 

[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280525021
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FlinkCatalogManager.java
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager implementation for Flink.
+ * TODO: [FLINK-11275] Decouple CatalogManager with Calcite
+ *   Idealy FlinkCatalogManager should be in flink-table-api-java module.
+ *   But due to that it currently depends on Calcite, a dependency that 
flink-table-api-java doesn't have right now.
+ *   We temporarily put FlinkCatalogManager in flink-table-planner-blink.
+ */
+public class FlinkCatalogManager implements CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCatalogManager.class);
+
+   public static final String BUILTIN_CATALOG_NAME = "builtin";
+
+   // The catalog to hold all registered and translated tables
+   // We disable caching here to prevent side effects
+   private CalciteSchema internalSchema = 
CalciteSchema.createRootSchema(false, false);
+   private SchemaPlus rootSchema = internalSchema.plus();
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   public FlinkCatalogManager() {
+   LOG.info("Initializing FlinkCatalogManager");
+   catalogs = new HashMap<>();
+
+   GenericInMemoryCatalog inMemoryCatalog = new 
GenericInMemoryCatalog(BUILTIN_CATALOG_NAME);
 
 Review comment:
   Good question, this PR only partially reflects the integration of 
`CatalogManager` and `TableEnv`. We propose adding new APIs to `TableEnv` to 
read/write catalog metadata (which will be a different set of JIRAs). If users 
try to write to a `ReadableCatalog`, a catalog exception will be thrown to 
remind them that. E.g. for CatalogBaseTable, we'll have
   
   ```
   // can have overloaded methods or a different signature to register table in 
a catalog different than the current one
   void registerTable(String name, CatalogBaseTable table, boolean 
ignoreIfExists) {
  if (!(catalogManager.getCurrentCatalog() instanceof 
ReadableWritableCatalog)) {
throw CatalogException("xxx is a ReadableCatalog only")
  } 
  catalog = (ReadableWritableCatalog) catalogManager.getCurrentCatalog()
  catalog.createTable(new ObjectPath(catalog.getCurrentDb(), name), table, 
ignoreIfExists)
   }
   ```
   
   `ReadableCatalog` and `ReadableWritableCatalog` is inherited from the 
original `ExternalCatalog` and `CrudExternalCatalog`, more context in 
[FLIP-30](https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs).
   
   I believe it's good to have `ReadableCatalog`. It doesn't make sense or just 
cannot support writing in some catalog impl. E.g. at least for now I don't see  
Confluent Registry catalog ( 
[FLINK-12256](https://issues.apache.org/jira/browse/FLINK-12256)) needs to 
support writing since writing thru Confluent Registry itself and its fancy UI 
is much more convenient and user friendly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact 

[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280600786
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##
 @@ -215,6 +218,74 @@ abstract class TableEnvironment(val config: TableConfig) {
 rels.map(_.asInstanceOf[ExecNode[_, _]])
   }
 
+  /**
+* Register an [[ReadableCatalog]] under a unique name.
+*
+* @param name the name under which the catalog will be registered
+* @param catalog the catalog to register
+* @throws CatalogAlreadyExistsException thrown if the catalog already 
exists
+*/
+  @throws[CatalogAlreadyExistsException]
+  def registerCatalog(name: String, catalog: ReadableCatalog): Unit = {
+catalogManager.registerCatalog(name, catalog)
+  }
+
+  /**
+* Get a registered [[ReadableCatalog]].
+*
+* @param catalogName name of the catalog to get
+* @return the requested catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+*/
+  @throws[CatalogNotExistException]
+  def getCatalog(catalogName: String): ReadableCatalog = {
+catalogManager.getCatalog(catalogName)
+  }
+
+  /**
+* Get the current catalog.
+*
+* @return the current catalog in CatalogManager
+*/
+  def getCurrentCatalog(): ReadableCatalog = {
+catalogManager.getCurrentCatalog
+  }
+
+  /**
+* Get the current database name.
+*
+* @return the current database of the current catalog
+*/
+  def getCurrentDatabaseName(): String = {
+catalogManager.getCurrentCatalog.getCurrentDatabase
+  }
+
+  /**
+* Set the current catalog.
+*
+* @param name name of the catalog to set as current catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+*/
+  @throws[CatalogNotExistException]
+  def setCurrentCatalog(name: String): Unit = {
+catalogManager.setCurrentCatalog(name)
+  }
+
+  /**
+* Set the current catalog and current database.
+*
+* @param catalogName name of the catalog to set as current catalog
+* @param databaseName name of the database to set as current database
+* @throws CatalogNotExistException  thrown if the catalog doesn't exist
+* @throws DatabaseNotExistException thrown if the database doesn't exist
+*/
+  @throws[CatalogNotExistException]
+  @throws[DatabaseNotExistException]
+  def setCurrentDatabase(catalogName: String, databaseName: String): Unit = {
+catalogManager.setCurrentCatalog(catalogName)
+catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName)
 
 Review comment:
   "default database" is only a default value of "current database", and yes, 
"current database" is a session property.
   
   I'm not strong on where to store the "current database". I prefer this way 
because 1) since every catalog can have such a property, we would need a map in 
CatalogManager to maintain the mapping then 2) it matches well with [catalog 
configs in SQL Client yaml file 
](https://docs.google.com/document/d/1ALxfiGZBaZ8KUNJtoT443hReoPoJEdt9Db2wiJ2LuWA/edit?usp=sharing)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-05-02 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932
 ] 

Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:28 PM:
---

Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

 

{{override def open(): Unit = {}}
{{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}}
{{  getRuntimeContext.addAccumulator("combiner-out", recordsOut)}}

{{}}}

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 


was (Author: cre...@gmail.com):
Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

 

{{override def open(): Unit = {}}
{{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}}
{{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}}

{{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}}
{{}}}

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 

> change to MockStreamTask breaks OneInputStreamOperatorTestHarness
> -
>
> Key: FLINK-12334
> URL: https://issues.apache.org/jira/browse/FLINK-12334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.8.0
>Reporter: Cliff Resnick
>Priority: Major
>
> The move to the MockStreamTask is created with does not include 
> initialization of an Accumulator Map when using the builder
> [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]
> This results in with a TestHarness whose context contains an immutable empty 
> map and is breaking tests. The fix is simple, please include an actual map in 
> the builder call.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-05-02 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932
 ] 

Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:27 PM:
---

Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

{{override def open(): Unit = {}}
{{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}}
{{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}}

{{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}}
{{}}}

 

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 


was (Author: cre...@gmail.com):
Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:
{{
  override def open(): Unit = {
getRuntimeContext.addAccumulator("combiner-in", recordsIn)
getRuntimeContext.addAccumulator("combiner-out", recordsOut)

getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)
  }
}}
 

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 

> change to MockStreamTask breaks OneInputStreamOperatorTestHarness
> -
>
> Key: FLINK-12334
> URL: https://issues.apache.org/jira/browse/FLINK-12334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.8.0
>Reporter: Cliff Resnick
>Priority: Major
>
> The move to the MockStreamTask is created with does not include 
> initialization of an Accumulator Map when using the builder
> [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]
> This results in with a TestHarness whose context contains an immutable empty 
> map and is breaking tests. The fix is simple, please include an actual map in 
> the builder call.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-05-02 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932
 ] 

Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:27 PM:
---

Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

 

{{override def open(): Unit = {}}
{{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}}
{{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}}

{{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}}
{{}}}

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 


was (Author: cre...@gmail.com):
Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

{{override def open(): Unit = {}}
{{ getRuntimeContext.addAccumulator("combiner-in", recordsIn)}}
{{ getRuntimeContext.addAccumulator("combiner-out", recordsOut)}}

{{ getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)}}
{{}}}

 

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 

> change to MockStreamTask breaks OneInputStreamOperatorTestHarness
> -
>
> Key: FLINK-12334
> URL: https://issues.apache.org/jira/browse/FLINK-12334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.8.0
>Reporter: Cliff Resnick
>Priority: Major
>
> The move to the MockStreamTask is created with does not include 
> initialization of an Accumulator Map when using the builder
> [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]
> This results in with a TestHarness whose context contains an immutable empty 
> map and is breaking tests. The fix is simple, please include an actual map in 
> the builder call.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-05-02 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932
 ] 

Cliff Resnick edited comment on FLINK-12334 at 5/2/19 8:26 PM:
---

Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:
{{
  override def open(): Unit = {
getRuntimeContext.addAccumulator("combiner-in", recordsIn)
getRuntimeContext.addAccumulator("combiner-out", recordsOut)

getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)
  }
}}
 

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 


was (Author: cre...@gmail.com):
Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

override def open(): Unit = {
 getRuntimeContext.addAccumulator("combiner-in", recordsIn)
 getRuntimeContext.addAccumulator("combiner-out", recordsOut)

 getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)
}

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 

> change to MockStreamTask breaks OneInputStreamOperatorTestHarness
> -
>
> Key: FLINK-12334
> URL: https://issues.apache.org/jira/browse/FLINK-12334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.8.0
>Reporter: Cliff Resnick
>Priority: Major
>
> The move to the MockStreamTask is created with does not include 
> initialization of an Accumulator Map when using the builder
> [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]
> This results in with a TestHarness whose context contains an immutable empty 
> map and is breaking tests. The fix is simple, please include an actual map in 
> the builder call.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12334) change to MockStreamTask breaks OneInputStreamOperatorTestHarness

2019-05-02 Thread Cliff Resnick (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831932#comment-16831932
 ] 

Cliff Resnick commented on FLINK-12334:
---

Hi [~fan_li_ya]

We have a custom StreamOperator with an open() method like:

override def open(): Unit = {
 getRuntimeContext.addAccumulator("combiner-in", recordsIn)
 getRuntimeContext.addAccumulator("combiner-out", recordsOut)

 getProcessingTimeService.scheduleAtFixedRate(this, pause, pause)
}

The `getRuntimeContext.addAccumulator` call no longer works in 1.8 because the 
MockStreamTask  returned by getRuntimeContext is trying to add the accumulator 
to an immutable empty map.

 

 

 

> change to MockStreamTask breaks OneInputStreamOperatorTestHarness
> -
>
> Key: FLINK-12334
> URL: https://issues.apache.org/jira/browse/FLINK-12334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.8.0
>Reporter: Cliff Resnick
>Priority: Major
>
> The move to the MockStreamTask is created with does not include 
> initialization of an Accumulator Map when using the builder
> [https://github.com/apache/flink/blob/e43d55445e7abcadb92460bb4d61e28540f1189d/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java#L198]
> This results in with a TestHarness whose context contains an immutable empty 
> map and is breaking tests. The fix is simple, please include an actual map in 
> the builder call.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix bug in statsd exporter when using negative values

2019-05-02 Thread GitBox
Xeli commented on a change in pull request #8259: [FLINK-12325][metrics] Fix 
bug in statsd exporter when using negative values
URL: https://github.com/apache/flink/pull/8259#discussion_r280538063
 
 

 ##
 File path: 
flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 ##
 @@ -216,4 +238,14 @@ public String filterCharacters(String input) {
 
return chars == null ? input : new String(chars, 0, pos);
}
+
+   private boolean numberIsNegative(Number input) {
+   try {
+   return new 
BigDecimal(input.toString()).compareTo(BigDecimal.ZERO) == -1;
 
 Review comment:
   Yeah good call, in this case the rounding that doubleValue might do wouldn't 
matter anyhow. Changed it!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-05-02 Thread GitBox
dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280532280
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FlinkCatalogManager.java
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager implementation for Flink.
+ * TODO: [FLINK-11275] Decouple CatalogManager with Calcite
+ *   Idealy FlinkCatalogManager should be in flink-table-api-java module.
+ *   But due to that it currently depends on Calcite, a dependency that 
flink-table-api-java doesn't have right now.
+ *   We temporarily put FlinkCatalogManager in flink-table-planner-blink.
+ */
+public class FlinkCatalogManager implements CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCatalogManager.class);
+
+   public static final String BUILTIN_CATALOG_NAME = "builtin";
+
+   // The catalog to hold all registered and translated tables
+   // We disable caching here to prevent side effects
+   private CalciteSchema internalSchema = 
CalciteSchema.createRootSchema(false, false);
+   private SchemaPlus rootSchema = internalSchema.plus();
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   public FlinkCatalogManager() {
+   LOG.info("Initializing FlinkCatalogManager");
+   catalogs = new HashMap<>();
+
+   GenericInMemoryCatalog inMemoryCatalog = new 
GenericInMemoryCatalog(BUILTIN_CATALOG_NAME);
 
 Review comment:
   I agree that there might be implementations that for time being do not 
support writing and might throw exception. In general I think though all the 
implementation do support writing in a way. I agree e.g. the owner of a 
platform might decide to disallow writing, but this should be rather done with 
some sort of access roles or property base etc. In the end all catalogs do 
support writing, otherwise there would be no way to enter data into them.
   
   The method you mentioned `registerTable` is part of the 
`ReadableWritableCatalog` not `ReadableCatalog`, if the `CatalogManager` works 
only with `ReadableCatalog`, we would have to perform instance checks and hard 
casts all the time. I see no reason for that. I believe we can just have a 
single `Catalog` interface that might throw exception if it should not be 
modified or just expose a flag such as `isMutable`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema

2019-05-02 Thread GitBox
bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280525021
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/catalog/FlinkCatalogManager.java
 ##
 @@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager implementation for Flink.
+ * TODO: [FLINK-11275] Decouple CatalogManager with Calcite
+ *   Idealy FlinkCatalogManager should be in flink-table-api-java module.
+ *   But due to that it currently depends on Calcite, a dependency that 
flink-table-api-java doesn't have right now.
+ *   We temporarily put FlinkCatalogManager in flink-table-planner-blink.
+ */
+public class FlinkCatalogManager implements CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCatalogManager.class);
+
+   public static final String BUILTIN_CATALOG_NAME = "builtin";
+
+   // The catalog to hold all registered and translated tables
+   // We disable caching here to prevent side effects
+   private CalciteSchema internalSchema = 
CalciteSchema.createRootSchema(false, false);
+   private SchemaPlus rootSchema = internalSchema.plus();
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   public FlinkCatalogManager() {
+   LOG.info("Initializing FlinkCatalogManager");
+   catalogs = new HashMap<>();
+
+   GenericInMemoryCatalog inMemoryCatalog = new 
GenericInMemoryCatalog(BUILTIN_CATALOG_NAME);
 
 Review comment:
   Good question, this PR only partially reflects the integration of 
`CatalogManager` and `TableEnv`. We propose adding new APIs to `TableEnv` to 
read/write catalog metadata. If users try to write to a `ReadableCatalog`, a 
catalog exception will be thrown to remind them that. E.g. for 
CatalogBaseTable, we'll have
   
   ```
   // can have overloaded methods or a different signature to register table in 
a catalog different than the current one
   void registerTable(String name, CatalogBaseTable table, boolean 
ignoreIfExists) {
  if (!(catalogManager.getCurrentCatalog() instanceof 
ReadableWritableCatalog)) {
throw CatalogException("xxx is a ReadableCatalog only")
  } 
  catalog = (ReadableWritableCatalog) catalogManager.getCurrentCatalog()
  catalog.createTable(new ObjectPath(catalog.getCurrentDb(), name), table, 
ignoreIfExists)
   }
   ```
   
   `ReadableCatalog` and `ReadableWritableCatalog` is inherited from the 
original `ExternalCatalog` and `CrudExternalCatalog`, more context in 
[FLIP-30](https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs).
   
   I believe it's good to have `ReadableCatalog`. It doesn't make sense or just 
cannot support writing in some catalog impl. E.g. at least for now I don't see  
Confluent Registry catalog ( 
[FLINK-12256](https://issues.apache.org/jira/browse/FLINK-12256)) needs to 
support writing since writing thru Confluent Registry itself and its fancy UI 
is much more convenient and user friendly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With 

[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280505199
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalog.java
 ##
 @@ -229,31 +231,97 @@ public void alterDatabase(String name, CatalogDatabase 
newDatabase, boolean igno
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
-   throw new UnsupportedOperationException();
+   try {
+   client.dropTable(
+   tablePath.getDatabaseName(),
+   tablePath.getObjectName(),
+   // Indicate whether associated data should be 
deleted.
+   // Set to 'true' for now because Flink tables 
shouldn't have data in Hive. Can be changed later if necessary
 
 Review comment:
   I think what we can do later is to expose this in the API, something like 
dropTable(ObjectPath, boolean, boolean deleteData).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280508961
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericCatalogTable.java
 ##
 @@ -41,28 +41,28 @@
private final List partitionKeys;
// Properties of the table
private final Map properties;
-   // Comment of the table
-   private String comment = "This is a generic catalog table.";
+   // Description of the table
+   private String description = "This is a generic catalog table.";
 
 Review comment:
   Ok. Please revert and create a followup JIRA to track this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280508513
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] 
Introduce the DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#discussion_r280475679
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##
 @@ -162,10 +164,13 @@ void assignExclusiveSegments(List 
segments) {
public void requestSubpartition(int subpartitionIndex) throws 
IOException, InterruptedException {
if (partitionRequestClient == null) {
// Create a client and request the partition
-   partitionRequestClient = connectionManager
-   .createPartitionRequestClient(connectionId);
+   try {
+   partitionRequestClient = 
connectionManager.createPartitionRequestClient(connectionId);
+   } catch (RemoteTransportException ex) {
 
 Review comment:
   Other `IOExceptions` are not relevant here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] 
Introduce the DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#discussion_r280471611
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 ##
 @@ -183,11 +184,17 @@ public void run() {
subpartitionView = checkAndWaitForSubpartitionView();
}
 
-   BufferAndBacklog next = subpartitionView.getNextBuffer();
+   BufferAndBacklog next;
+   try {
+   next = subpartitionView.getNextBuffer();
+   } catch (Throwable t) {
 
 Review comment:
   I think we should not catch `Throwable` here. Instead we should only catch 
the `IOException`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] 
Introduce the DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#discussion_r280481421
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 ##
 @@ -389,25 +401,80 @@ public void testGetNextAfterPartitionReleased() throws 
Exception {
try {
channel.getNextBuffer();
fail("Did not throw expected CancelTaskException");
-   } catch (CancelTaskException ignored) {
+   } catch (DataConsumptionException ignored) {
}
 
channel.releaseAllResources();
assertFalse(channel.getNextBuffer().isPresent());
}
 
+   /**
+* Tests the {@link PartitionNotFoundException} is thrown during 
requesting partition and
+* wrapped into {@link DataConsumptionException}.
+*/
+   @Test
+   public void testDataConsumptionExceptionDuringPartitionRequest() throws 
Exception {
+   Tuple2 backoff = new Tuple2<>(0, 0);
+
+   ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+
+   ResultPartitionID partitionId = new ResultPartitionID();
+
+   LocalInputChannel ch = createLocalInputChannel(
+   InputChannelTestUtils.createSingleInputGate(1), 
partitionId, partitionManager, backoff);
+
+   try {
+   ch.requestSubpartition(0);
+   fail("Did not throw expected DataConsumptionException");
+   } catch (IOException ex) {
+   assertTrue(ExceptionUtils.findThrowable(ex, 
PartitionNotFoundException.class).isPresent());
+   verifyDataConsumptionException(partitionId, ex);
+   }
+   }
+
+   /**
+* Tests the {@link DataConsumptionException} is wrapped when {@link 
ResultSubpartitionView#getNextBuffer()}
+* throws an exception.
+*/
+   @Test
+   public void testDataConsumptionExceptionDuringGetNextBuffer() throws 
Exception {
+   Tuple2 backoff = new Tuple2<>(0, 0);
+
+   ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.getNextBuffer()).thenThrow(new IOException("Expected 
test exception"));
+
+   ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
+   when(partitionManager
+   .createSubpartitionView(any(ResultPartitionID.class), 
anyInt(), any(BufferAvailabilityListener.class)))
+   .thenReturn(view);
 
 Review comment:
   Same here. Is it possible to do it without mocking?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] 
Introduce the DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#discussion_r280480963
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 ##
 @@ -260,13 +265,20 @@ public void testProducerFailedException() throws 
Exception {
BufferProvider bufferProvider = mock(BufferProvider.class);
when(inputGate.getBufferProvider()).thenReturn(bufferProvider);
 
+   ResultPartitionID partitionId = new ResultPartitionID();
LocalInputChannel ch = createLocalInputChannel(
-   inputGate, partitionManager, new Tuple2<>(0, 
0));
+   inputGate, partitionId, partitionManager, new 
Tuple2<>(0, 0));
 
ch.requestSubpartition(0);
 
-   // Should throw an instance of CancelTaskException.
-   ch.getNextBuffer();
+   // Should throw an instance of DataConsumptionException.
+   try {
+   ch.getNextBuffer();
+   fail("Did not throw expected DataConsumptionException");
+   } catch (IOException ex) {
 
 Review comment:
   Can't we simply catch a `DataConsumptionException` here instead of calling 
`verifyDataConsumptionException`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] 
Introduce the DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#discussion_r280479507
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
 ##
 @@ -383,9 +397,16 @@ private EmbeddedChannel createEmbeddedChannel() {
return new EmbeddedChannel(protocol.getClientChannelHandlers());
}
 
-   private RemoteInputChannel addInputChannel(NetworkClientHandler 
clientHandler)
-   throws IOException {
-   RemoteInputChannel rich = createRemoteInputChannel();
+   private RemoteInputChannel addInputChannel(
+   NetworkClientHandler clientHandler,
+   ResultPartitionID partitionId) throws Exception {
+   RemoteInputChannel rich = new RemoteInputChannel(
+   InputChannelTestUtils.createSingleInputGate(1),
+   0,
+   partitionId,
+   mock(ConnectionID.class),
 
 Review comment:
   Can't we create a `ConnectionID` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] 
Introduce the DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#discussion_r280482206
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ##
 @@ -333,6 +331,70 @@ public void testProducerFailedException() throws 
Exception {
ch.getNextBuffer();
}
 
+   /**
+* Tests that {@link DataConsumptionException} is wrapped if {@link 
RemoteTransportException}
+* is thrown during creating {@link PartitionRequestClient}.
+*/
+   @Test
+   public void testDataConsumptionExceptionDuringCreatingClient() throws 
Exception {
+   final ConnectionManager connManager = 
mock(ConnectionManager.class);
+   
when(connManager.createPartitionRequestClient(any(ConnectionID.class)))
+   .thenThrow(RemoteTransportException.class);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final RemoteInputChannel ch = 
createRemoteInputChannel(partitionId, connManager);
+
+   try {
+   ch.requestSubpartition(0);
+   fail("Did not throw expected DataConsumptionException");
+   } catch (IOException ex) {
+   verifyDataConsumptionException(partitionId, ex);
+   }
+   }
+
+   /**
+* Tests that {@link DataConsumptionException} is wrapped if an 
exception is thrown
+* during requesting partition.
+*/
+   @Test
+   public void testDataConsumptionExceptionDuringPartitionRequest() throws 
Exception {
+   final ConnectionManager connManager = 
mock(ConnectionManager.class);
+   final PartitionRequestClient client = 
mock(PartitionRequestClient.class);
+   
when(connManager.createPartitionRequestClient(any(ConnectionID.class)))
+   .thenReturn(client);
+
+   final ResultPartitionID partitionId = new ResultPartitionID();
+   final RemoteInputChannel ch = 
createRemoteInputChannel(partitionId, connManager);
+   when(client.requestSubpartition(partitionId, 0, ch, 
0)).thenThrow(IOException.class);
 
 Review comment:
   Can we do these tests without the extensive mocking? I think this is better 
for maintainability.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] 
Introduce the DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#discussion_r280481073
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 ##
 @@ -389,25 +401,80 @@ public void testGetNextAfterPartitionReleased() throws 
Exception {
try {
channel.getNextBuffer();
fail("Did not throw expected CancelTaskException");
-   } catch (CancelTaskException ignored) {
+   } catch (DataConsumptionException ignored) {
}
 
channel.releaseAllResources();
assertFalse(channel.getNextBuffer().isPresent());
}
 
+   /**
+* Tests the {@link PartitionNotFoundException} is thrown during 
requesting partition and
+* wrapped into {@link DataConsumptionException}.
+*/
+   @Test
+   public void testDataConsumptionExceptionDuringPartitionRequest() throws 
Exception {
+   Tuple2 backoff = new Tuple2<>(0, 0);
+
+   ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+
+   ResultPartitionID partitionId = new ResultPartitionID();
+
+   LocalInputChannel ch = createLocalInputChannel(
+   InputChannelTestUtils.createSingleInputGate(1), 
partitionId, partitionManager, backoff);
+
+   try {
+   ch.requestSubpartition(0);
+   fail("Did not throw expected DataConsumptionException");
+   } catch (IOException ex) {
 
 Review comment:
   Same here, why not simply catching a `DataConsumptionException`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] 
Introduce the DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#discussion_r280480320
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 ##
 @@ -389,25 +401,80 @@ public void testGetNextAfterPartitionReleased() throws 
Exception {
try {
channel.getNextBuffer();
fail("Did not throw expected CancelTaskException");
 
 Review comment:
   update message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] 
Introduce the DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#discussion_r280481273
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 ##
 @@ -389,25 +401,80 @@ public void testGetNextAfterPartitionReleased() throws 
Exception {
try {
channel.getNextBuffer();
fail("Did not throw expected CancelTaskException");
-   } catch (CancelTaskException ignored) {
+   } catch (DataConsumptionException ignored) {
}
 
channel.releaseAllResources();
assertFalse(channel.getNextBuffer().isPresent());
}
 
+   /**
+* Tests the {@link PartitionNotFoundException} is thrown during 
requesting partition and
+* wrapped into {@link DataConsumptionException}.
+*/
+   @Test
+   public void testDataConsumptionExceptionDuringPartitionRequest() throws 
Exception {
+   Tuple2 backoff = new Tuple2<>(0, 0);
+
+   ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+
+   ResultPartitionID partitionId = new ResultPartitionID();
+
+   LocalInputChannel ch = createLocalInputChannel(
+   InputChannelTestUtils.createSingleInputGate(1), 
partitionId, partitionManager, backoff);
+
+   try {
+   ch.requestSubpartition(0);
+   fail("Did not throw expected DataConsumptionException");
+   } catch (IOException ex) {
+   assertTrue(ExceptionUtils.findThrowable(ex, 
PartitionNotFoundException.class).isPresent());
+   verifyDataConsumptionException(partitionId, ex);
+   }
+   }
+
+   /**
+* Tests the {@link DataConsumptionException} is wrapped when {@link 
ResultSubpartitionView#getNextBuffer()}
+* throws an exception.
+*/
+   @Test
+   public void testDataConsumptionExceptionDuringGetNextBuffer() throws 
Exception {
+   Tuple2 backoff = new Tuple2<>(0, 0);
+
+   ResultSubpartitionView view = 
mock(ResultSubpartitionView.class);
+   when(view.getNextBuffer()).thenThrow(new IOException("Expected 
test exception"));
 
 Review comment:
   Can we do this without mocking?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] Introduce the DataConsumptionException for downstream task failure

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8242: [FLINK-6227][network] 
Introduce the DataConsumptionException for downstream task failure
URL: https://github.com/apache/flink/pull/8242#discussion_r280473083
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 ##
 @@ -127,7 +128,7 @@ void requestSubpartition(int subpartitionIndex) throws 
IOException, InterruptedE
if (increaseBackoff()) {
 
 Review comment:
   What about other `IOExceptions` which might be thrown out of the 
`ResultPartitionManager#createSubpartitionView`? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12389) flink codegen set String type for ByteBuffer fields

2019-05-02 Thread Yu Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831745#comment-16831745
 ] 

Yu Yang edited comment on FLINK-12389 at 5/2/19 4:42 PM:
-

[~twalthr], [~fhueske] thanks for the reply!  given that this issue is with 
legacy planner, shall we try with flink-table-planner-blink? any pointers on 
the current status and the community's plan on flink-table-planner-blink and 
flink-table-runtime-blink? 

[~ykt836], [~jinyu.zj]  could you share any insights on this? ^^


was (Author: yuyang08):
[~twalthr], [~fhueske] thanks for the reply!  given that this issue is with 
legacy planner, shall we try with flink-table-planner-blink? any pointers on 
the current status and the community's plan on flink-table-planner-blink and 
flink-table-runtime-blink? 

> flink codegen set String type for ByteBuffer fields
> ---
>
> Key: FLINK-12389
> URL: https://issues.apache.org/jira/browse/FLINK-12389
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.8.0
>Reporter: Yu Yang
>Priority: Major
>
> We try to write a simple flink sql program using "select  .. from " 
> statement, and encounter  a compile exception. 
> *Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"*
> Further debugging shows that the following flink-generated code snippet 
> caused problem: 
> {code}
>   final java.lang.reflect.Field 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds =
>   org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField(
> com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds");
> ...
> boolean isNull$5 = (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == 
> null;
> java.lang.String result$4;
> if (isNull$5) {
>   result$4 = "";
> }
> else {
>   result$4 = (java.lang.String) (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1);
> }
>
> {code}
> The following is the stack track:
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 17 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049)
>   at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)
>   at org.codehaus.janino.Java$Cast.accept(Java.java:4887)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790)
>   at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732)
>   at org.codehaus.janino.Java$Assignment.accept(Java.java:4466)
>   at 

[GitHub] [flink] xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support table related operations in GenericHiveMetastoreCatalog

2019-05-02 Thread GitBox
xuefuz commented on a change in pull request #8329: [FLINK-12239][hive] Support 
table related operations in GenericHiveMetastoreCatalog
URL: https://github.com/apache/flink/pull/8329#discussion_r280503007
 
 

 ##
 File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/GenericHiveMetastoreCatalogUtil.java
 ##
 @@ -18,32 +18,179 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.GenericCatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.plan.stats.TableStats;
 
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.stream.Collectors;
 
 /**
  * Utils to convert meta objects between Flink and Hive for 
GenericHiveMetastoreCatalog.
  */
 public class GenericHiveMetastoreCatalogUtil {
 
+   // Prefix used to distinguish properties created by Hive and Flink,
+   // as Hive metastore has its own properties created upon table creation 
and migration between different versions of metastore.
+   private static final String FLINK_PROPERTY_PREFIX = "flink.";
+
+   // Flink tables should be stored as 'external' tables in Hive metastore
+   private static final Map EXTERNAL_TABLE_PROPERTY = new 
HashMap() {{
+   put("EXTERNAL", "TRUE");
+   }};
+
private GenericHiveMetastoreCatalogUtil() {
}
 
// -- Utils --
 
/**
-* Creates a Hive database from CatalogDatabase.
+* Creates a Hive database from a CatalogDatabase.
+*
+* @param databaseName name of the database
+* @param catalogDatabase the CatalogDatabase instance
+* @return a Hive database
 */
-   public static Database createHiveDatabase(String dbName, 
CatalogDatabase db) {
-   Map props = db.getProperties();
+   public static Database createHiveDatabase(String databaseName, 
CatalogDatabase catalogDatabase) {
return new Database(
-   dbName,
-   db.getDescription().isPresent() ? 
db.getDescription().get() : null,
+   databaseName,
+   catalogDatabase.getDescription().isPresent() ? 
catalogDatabase.getDescription().get() : null,
null,
-   props);
+   catalogDatabase.getProperties());
+   }
+
+   /**
+* Creates a Hive table from a CatalogBaseTable.
+*
+* @param tablePath path of the table
+* @param table the CatalogBaseTable instance
+* @return a Hive table
+*/
+   public static Table createHiveTable(ObjectPath tablePath, 
CatalogBaseTable table) {
+   Map properties = new 
HashMap<>(table.getProperties());
+
+   // Table description
+   if (table.getDescription().isPresent()) {
+   properties.put(HiveTableConfig.TABLE_DESCRITPION, 
table.getDescription().get());
+   }
+
+   Table hiveTable = new Table();
+   hiveTable.setDbName(tablePath.getDatabaseName());
+   hiveTable.setTableName(tablePath.getObjectName());
+   hiveTable.setCreateTime((int) (System.currentTimeMillis() / 
1000));
+
+   // Table properties
+   hiveTable.setParameters(buildFlinkProperties(properties));
+   hiveTable.getParameters().putAll(EXTERNAL_TABLE_PROPERTY);
+
+   // Hive table's StorageDescriptor
+   StorageDescriptor sd = new StorageDescriptor();
+   sd.setSerdeInfo(new SerDeInfo(null, null, new HashMap<>()));
+
+   List allColumns = 
createHiveColumns(table.getSchema());
+
+   // Table columns and partition keys
+   CatalogTable catalogTable = (CatalogTable) table;
+
+   if (catalogTable.isPartitioned()) {
+   int partitionKeySize = 
catalogTable.getPartitionKeys().size();
+   List regularColumns = 
allColumns.subList(0, allColumns.size() - partitionKeySize);
+   List partitionColumns = 
allColumns.subList(allColumns.size() - partitionKeySize, allColumns.size());
+
+   sd.setCols(regularColumns);
+   

[GitHub] [flink] flinkbot commented on issue #8336: [FLINK-12305][docs][table] Improve Table API/SQL time attribute documentation.

2019-05-02 Thread GitBox
flinkbot commented on issue #8336: [FLINK-12305][docs][table] Improve Table 
API/SQL time attribute documentation.
URL: https://github.com/apache/flink/pull/8336#issuecomment-488744275
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12305) Table API Clarification

2019-05-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12305:
---
Labels: pull-request-available  (was: )

> Table API Clarification
> ---
>
> Key: FLINK-12305
> URL: https://issues.apache.org/jira/browse/FLINK-12305
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Affects Versions: 1.8.0
>Reporter: Alex Barnes
>Assignee: Fabian Hueske
>Priority: Minor
>  Labels: pull-request-available
>
> It is not clear from the documentation if late arriving data is correctly 
> handled in the Flink Table/SQL APIs. The documentation makes passing 
> reference to recognizing late arriving data, but does not go into depth as to 
> what kind of triggering/processing can be performed on it 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time]
> This old email thread on the apache-flink-users mailing list tells a 
> different story - specifically that late arriving data is not supported and 
> DataStream APIs need to be used instead:
> [http://osdir.com/apache-flink-users/msg08110.html]
> Has support been added since that email correspondence? Please consider 
> reducing ambiguity in the documentation and update it to better reflect the 
> current/planned state of support for late arriving data in the Table API.
> Thanks,
> Alex



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] fhueske opened a new pull request #8336: [FLINK-12305][docs][table] Improve Table API/SQL time attribute documentation.

2019-05-02 Thread GitBox
fhueske opened a new pull request #8336: [FLINK-12305][docs][table] Improve 
Table API/SQL time attribute documentation.
URL: https://github.com/apache/flink/pull/8336
 
 
   ## Contribution Checklist
   
   ## What is the purpose of the change
   
   * Improve introduction of processing time and event time support of Table 
API / SQL.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12389) flink codegen set String type for ByteBuffer fields

2019-05-02 Thread Yu Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16831745#comment-16831745
 ] 

Yu Yang commented on FLINK-12389:
-

[~twalthr], [~fhueske] thanks for the reply!  given that this issue is with 
legacy planner, shall we try with flink-table-planner-blink? any pointers on 
the current status and the community's plan on flink-table-planner-blink and 
flink-table-runtime-blink? 

> flink codegen set String type for ByteBuffer fields
> ---
>
> Key: FLINK-12389
> URL: https://issues.apache.org/jira/browse/FLINK-12389
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Legacy Planner
>Affects Versions: 1.8.0
>Reporter: Yu Yang
>Priority: Major
>
> We try to write a simple flink sql program using "select  .. from " 
> statement, and encounter  a compile exception. 
> *Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"*
> Further debugging shows that the following flink-generated code snippet 
> caused problem: 
> {code}
>   final java.lang.reflect.Field 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds =
>   org.apache.flink.api.java.typeutils.TypeExtractor.getDeclaredField(
> com.pinterest.utzv2.thrift.RealtimeSpendValue.class, "segmentIds");
> ...
> boolean isNull$5 = (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1) == 
> null;
> java.lang.String result$4;
> if (isNull$5) {
>   result$4 = "";
> }
> else {
>   result$4 = (java.lang.String) (java.nio.ByteBuffer) 
> field_com$pinterest$utzv2$thrift$RealtimeSpendValue_segmentIds.get(in1);
> }
>
> {code}
> The following is the stack track:
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
>   ... 17 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>   at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
>   at 
> org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:425)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:291)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 110, Column 
> 38: Cannot cast "java.nio.ByteBuffer" to "java.lang.String"
>   at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>   at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049)
>   at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)
>   at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)
>   at org.codehaus.janino.Java$Cast.accept(Java.java:4887)
>   at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>   at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3790)
>   at org.codehaus.janino.UnitCompiler.access$6100(UnitCompiler.java:215)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3752)
>   at 
> org.codehaus.janino.UnitCompiler$13.visitAssignment(UnitCompiler.java:3732)
>   at org.codehaus.janino.Java$Assignment.accept(Java.java:4466)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>   at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>   at 
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>   

[jira] [Assigned] (FLINK-12305) Table API Clarification

2019-05-02 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-12305:
-

Assignee: Fabian Hueske

> Table API Clarification
> ---
>
> Key: FLINK-12305
> URL: https://issues.apache.org/jira/browse/FLINK-12305
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Affects Versions: 1.8.0
>Reporter: Alex Barnes
>Assignee: Fabian Hueske
>Priority: Minor
>
> It is not clear from the documentation if late arriving data is correctly 
> handled in the Flink Table/SQL APIs. The documentation makes passing 
> reference to recognizing late arriving data, but does not go into depth as to 
> what kind of triggering/processing can be performed on it 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time]
> This old email thread on the apache-flink-users mailing list tells a 
> different story - specifically that late arriving data is not supported and 
> DataStream APIs need to be used instead:
> [http://osdir.com/apache-flink-users/msg08110.html]
> Has support been added since that email correspondence? Please consider 
> reducing ambiguity in the documentation and update it to better reflect the 
> current/planned state of support for late arriving data in the Table API.
> Thanks,
> Alex



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-02 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280485979
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/CharType.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Logical type of a fixed-length character string.
+ *
+ * The serialized string representation is {@code CHAR(n)} where {@code n} 
is the number of
+ * code points. {@code n} must have a value between 1 and 255 (both 
inclusive). If no length is
+ * specified, {@code n} is equal to 1.
+ */
+@PublicEvolving
+public final class CharType extends LogicalType {
+
+   private static final int MIN_LENGTH = 1;
+
+   private static final int MAX_LENGTH = 255;
+
+   private static final int DEFAULT_LENGTH = 1;
+
+   private static final String DEFAULT_FORMAT = "CHAR(%d)";
 
 Review comment:
   nit: `DEFAULT_FORMAT` -> `FORMAT`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-02 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280484675
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalTypeRoot.java
 ##
 @@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * An enumeration of logical type roots containing static information about 
logical data types.
+ *
+ * A root is an essential description of a {@link LogicalType} without 
additional parameters. For
+ * example, a parameterized logical type {@code DECIMAL(12,3)} possesses all 
characteristics of its
+ * root {@code DECIMAL}. Additionally, a logical type root enables efficient 
comparision during the
+ * evaluation of types.
+ *
+ * The enumeration is very close to the SQL standard in terms of naming and 
completeness. However,
+ * it reflects just a subset of the evolving standard and contains some 
extensions (such as {@code NULL}
+ * or {@code ANY}).
+ *
+ * See the type-implementing classes for a more detailed description of 
each type.
+ */
+@PublicEvolving
+public enum LogicalTypeRoot {
+
+   CHAR(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.CHARACTER_STRING),
+
+   VARCHAR(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.CHARACTER_STRING),
+
+   BOOLEAN(
+   LogicalTypeFamily.PREDEFINED),
+
+   BINARY(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.BINARY_STRING),
+
+   VARBINARY(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.BINARY_STRING),
+
+   DECIMAL(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.NUMERIC,
+   LogicalTypeFamily.EXACT_NUMERIC),
+
+   TINYINT(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.NUMERIC,
+   LogicalTypeFamily.EXACT_NUMERIC),
+
+   SMALLINT(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.NUMERIC,
+   LogicalTypeFamily.EXACT_NUMERIC),
+
+   INTEGER(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.NUMERIC,
+   LogicalTypeFamily.EXACT_NUMERIC),
+
+   BIGINT(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.NUMERIC,
+   LogicalTypeFamily.EXACT_NUMERIC),
+
+   FLOAT(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.NUMERIC,
+   LogicalTypeFamily.APPROXIMATE_NUMERIC),
+
+   DOUBLE(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.NUMERIC,
+   LogicalTypeFamily.APPROXIMATE_NUMERIC),
+
+   DATE(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.DATETIME),
+
+   TIME_WITHOUT_TIME_ZONE(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.DATETIME),
+
+   TIMESTAMP_WITHOUT_TIME_ZONE(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.DATETIME,
+   LogicalTypeFamily.TIMESTAMP),
+
+   TIMESTAMP_WITH_TIME_ZONE(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.DATETIME,
+   LogicalTypeFamily.TIMESTAMP),
+
+   TIMESTAMP_WITH_LOCAL_TIME_ZONE(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.DATETIME,
+   LogicalTypeFamily.TIMESTAMP,
+   LogicalTypeFamily.EXTENSION),
+
+   INTERVAL_YEAR_MONTH(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.INTERVAL),
+
+   INTERVAL_DAY_TIME(
+   LogicalTypeFamily.PREDEFINED,
+   LogicalTypeFamily.INTERVAL),
+
+   ARRAY(
+   LogicalTypeFamily.CONSTRUCTED,
+   

[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-02 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280487361
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LogicalType.java
 ##
 @@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A logical type that describes the data type of a value. It does not imply a 
concrete physical
+ * representation for transmission or storage but defines the boundaries 
between JVM-based languages
+ * and the table ecosystem.
+ *
+ * The definition of a logical type is similar to the SQL standard's "data 
type" terminology but
+ * also contains information about the nullability of a value for efficient 
handling of scalar
+ * expressions.
+ *
+ * Subclasses of this class define characteristics of built-in or 
user-defined types.
+ *
+ * Instances of this class describe the fully parameterized, immutable type 
with additional
+ * information such as numeric precision or expected length.
+ *
+ * NOTE: A logical type is just a description of a type, a planner or 
runtime might not support
+ * every type in every logical precision yet!
+ */
+@PublicEvolving
+public abstract class LogicalType implements Serializable {
+
+   private final boolean isNullable;
+
+   private final LogicalTypeRoot typeRoot;
+
+   public LogicalType(boolean isNullable, LogicalTypeRoot typeRoot) {
+   this.isNullable = isNullable;
+   this.typeRoot = Preconditions.checkNotNull(typeRoot);
+   }
+
+   /**
+* Returns whether a value of this type can be {@code null}.
+*/
+   public boolean isNullable() {
+   return isNullable;
+   }
+
+   /**
+* Returns the root of this type. It is an essential description 
without additional parameters.
+*/
+   public LogicalTypeRoot getTypeRoot() {
+   return typeRoot;
+   }
+
+   /**
+* Returns a deep copy of this type with possibly different nullability.
+*
+* @param isNullable the intended nullability of the copied type
+* @return a deep copy
+*/
+   public abstract LogicalType copy(boolean isNullable);
+
+   /**
+* Returns a deep copy of this type.
+*
+* @return a deep copy
+*/
+   public LogicalType copy() {
 
 Review comment:
   Make this method `final` with a comment to implement `LogicalType 
copy(boolean isNullable)` instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-02 Thread GitBox
dawidwys commented on a change in pull request #8335: 
[FLINK-12253][table-common] Setup a class hierarchy for the new logical type 
system
URL: https://github.com/apache/flink/pull/8335#discussion_r280480232
 
 

 ##
 File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypesTest.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.junit.Assert;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for subclasses of {@link 
org.apache.flink.table.types.logical.LogicalType}.
+ */
+public class LogicalTypesTest {
+
+   // 

+
+   private static void testAll(
+   LogicalType nullableType,
+   String typeString,
+   Class[] input,
 
 Review comment:
   `input` -> `supportedInputClasses`
   `output` -> `supportedOutputClasses`
   
   or a Javadoc?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280463187
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
 ##
 @@ -187,8 +186,6 @@ private void decorateNode(Integer vertexID, ObjectNode 
node) {
node.put(PACT, "Operator");
}
 
-   StreamOperator operator = 
streamGraph.getStreamNode(vertexID).getOperator();
 
 Review comment:
   Is this change relevant to the rest of the PR? If not, could you extract 
changes in this file to a separate `[hotfix]` commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280472961
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/OneInputOperatorWrapper.java
 ##
 @@ -52,7 +53,9 @@ public 
OneInputOperatorWrapper(GeneratedClass> g
public void setup(StreamTask containingTask, StreamConfig config,
Output> output) {
operator = 
generatedClass.newInstance(containingTask.getUserCodeClassLoader());
-   operator.setup(containingTask, config, output);
+   if (operator instanceof SetupableStreamOperator) {
+   ((SetupableStreamOperator) 
operator).setup(containingTask, config, output);
 
 Review comment:
   Would it be viable to migrate this already to the non setupable operator? So 
to replace this if check with:
   ```
   operator = 
generatedClass.newInstance(containingTask.getUserCodeClassLoader(), 
containingTask, config, output);
   ```
   ?
   
   If it's non trivial it could be done as some kind of follow up step.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280466736
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ##
 @@ -480,13 +483,17 @@ private StreamGraph 
generateInternal(List> transformatio
streamGraph.addSource(source.getId(),
slotSharingGroup,
source.getCoLocationGroupKey(),
-   source.getOperator(),
+   source.getOperatorFactory(),
null,
source.getOutputType(),
"Source: " + source.getName());
-   if (source.getOperator().getUserFunction() instanceof 
InputFormatSourceFunction) {
-   InputFormatSourceFunction fs = 
(InputFormatSourceFunction) source.getOperator().getUserFunction();
-   streamGraph.setInputFormat(source.getId(), 
fs.getFormat());
+   if (source.getOperatorFactory() instanceof 
SimpleOperatorFactory) {
 
 Review comment:
   Possible issue: this code doesn't support setting input formats for non 
`SimpleOperatorFactories`. By implementing this check this way, we are 
supporting all of the present cases, but it makes it kind of strange, that for 
the new way we don't support it - we do not have a migration path to get rid of 
`StreamOperators` from the `StreamTransformation`.
   
   Could this if check be reworked to something like:
   ```
   if (source.getOperatorFactory() insnaceof InputFormatSourceOperatorFactory) {
 streamGraph.setInputFormat(id, ((InputFormatSourceOperator) 
source.getOperatorFactory()).getFormat());
   }
   ```
   ?
   Combination of `SimpleOperatorFactory` and 
`InputFormatSourceOperatorFactory` could implement `getFormat() { return 
this.getOperator().getUserFunction().getFormat()`.
   
   while future non `SimpleOperatorFactory` could be supported as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280467152
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
 ##
 @@ -252,9 +253,12 @@ private boolean generateNodeHash(
 
if (LOG.isDebugEnabled()) {
String udfClassName = "";
-   if (node.getOperator() instanceof 
AbstractUdfStreamOperator) {
-   udfClassName = ((AbstractUdfStreamOperator) node.getOperator())
-   
.getUserFunction().getClass().getName();
+   if (node.getOperatorFactory() instanceof 
SimpleOperatorFactory) {
 
 Review comment:
   ditto about factories. `if (node.getOperatorFactory() instanceof 
UdfStreamOperatorFactory)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280470782
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.io.Serializable;
+
+/**
+ * A factory to create {@link StreamOperator}.
+ *
+ * @param  The output type of the operator
+ */
 
 Review comment:
   `@PublicEvolving`? `@Internal`? (question to someone from API team)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280474556
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SetupableStreamOperator.java
 ##
 @@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * Stream operators can implement this interface if they need access to the 
context and the output.
+ *
+ * @param  The output type of the operator
+ */
+@PublicEvolving
+public interface SetupableStreamOperator {
 
 Review comment:
   Mark the class `@Deprecated` and add java doc
   > This class is deprecated in favour of using `StreamOperatorFactory` and 
it's `StreamOperatorFactory#createStreamOperator` and passing the required 
parameters to the `Operator`'s constructor in create method.
   
   ?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280467451
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ##
 @@ -648,12 +649,14 @@ private void configureCheckpointing() {
final ArrayList hooks = new 
ArrayList<>();
 
for (StreamNode node : streamGraph.getStreamNodes()) {
-   StreamOperator op = node.getOperator();
-   if (op instanceof AbstractUdfStreamOperator) {
-   Function f = ((AbstractUdfStreamOperator) 
op).getUserFunction();
-
-   if (f instanceof WithMasterCheckpointHook) {
-   hooks.add(new 
FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook) f));
+   if (node.getOperatorFactory() instanceof 
SimpleOperatorFactory) {
 
 Review comment:
   ditto `if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] piyushnarang commented on a change in pull request #8117: [FLINK-12115] [filesystems]: Add support for AzureFS

2019-05-02 Thread GitBox
piyushnarang commented on a change in pull request #8117: [FLINK-12115] 
[filesystems]: Add support for AzureFS
URL: https://github.com/apache/flink/pull/8117#discussion_r280465958
 
 

 ##
 File path: 
flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java
 ##
 @@ -48,6 +55,33 @@ public void configure(Configuration config) {
public FileSystem create(URI fsUri) throws IOException {
checkNotNull(fsUri, "passed file system URI object should not 
be null");
LOG.info("Trying to load and instantiate Azure File System");
-   return new AzureFileSystem(fsUri, flinkConfig);
+   return new HadoopFileSystem(createInitializedAzureFS(fsUri, 
flinkConfig));
+   }
+
+   // uri is of the form: 
wasb(s)://yourcontai...@youraccount.blob.core.windows.net/testDir
+   private org.apache.hadoop.fs.FileSystem createInitializedAzureFS(URI 
fsUri, Configuration flinkConfig) throws IOException {
+   org.apache.hadoop.conf.Configuration hadoopConfig = 
HadoopUtils.getHadoopConfiguration(flinkConfig);
 
 Review comment:
   Updated the review to incorporate this. I've filed: 
https://jira.apache.org/jira/browse/FLINK-12394


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12394) Update FileSystem factory classes to use HadoopConfigLoader

2019-05-02 Thread Piyush Narang (JIRA)
Piyush Narang created FLINK-12394:
-

 Summary: Update FileSystem factory classes to use 
HadoopConfigLoader
 Key: FLINK-12394
 URL: https://issues.apache.org/jira/browse/FLINK-12394
 Project: Flink
  Issue Type: Task
Reporter: Piyush Narang


As part of the review feedback in PR: 
https://github.com/apache/flink/pull/8117, we noticed that there are a couple 
of existing FileSystem factories that are manually copying relevant config 
entries from flink config -> hadoop config as part of the factory 
configuration. We already have an existing class to do this - 
HadoopConfigLoader. This jira is to track the work to update these existing 
file system factories to switch over to the HadoopConfigLoader once the above 
PR is merged.
Factories that need to be fixed:
SwiftFileSystemFactory
OSSFileSystemFactory
HadoopFsFactory 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode

2019-05-02 Thread GitBox
lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application 
can't stop when flink job failed in per-job yarn cluste mode
URL: https://github.com/apache/flink/pull/8254#issuecomment-488708907
 
 
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode

2019-05-02 Thread GitBox
tillrohrmann commented on issue #8254: [FLINK-12219][runtime] Yarn application 
can't stop when flink job failed in per-job yarn cluste mode
URL: https://github.com/apache/flink/pull/8254#issuecomment-488708092
 
 
   The commit message and the PR title were a bit misleading. I've updated them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-05-02 Thread GitBox
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r280451532
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/pom.xml
 ##
 @@ -0,0 +1,121 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.8-SNAPSHOT
 
 Review comment:
   Sorry for the long review time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-05-02 Thread GitBox
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r280450967
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java
 ##
 @@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeIdsForCheckpoint;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint;
+import org.apache.flink.streaming.connectors.gcp.pubsub.common.Acknowledger;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.auth.Credentials;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.cloud.pubsub.v1.stub.SubscriberStub;
+import com.google.pubsub.v1.AcknowledgeRequest;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.PullRequest;
+import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.ReceivedMessage;
+import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
+import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.google.cloud.pubsub.v1.SubscriptionAdminSettings.defaultCredentialsProviderBuilder;
+import static java.util.Collections.emptyList;
+
+/**
+ * PubSub Source, this Source will consume PubSub messages from a subscription 
and Acknowledge them on the next checkpoint.
+ * This ensures every message will get acknowledged at least once.
+ */
+public class PubSubSource extends RichSourceFunction
+   implements Acknowledger, ResultTypeQueryable, 
ParallelSourceFunction, CheckpointListener, 
ListCheckpointed> {
+   private static final Logger LOG = 
LoggerFactory.getLogger(PubSubSource.class);
+   protected final PubSubDeserializationSchema deserializationSchema;
+   protected final PubSubSubscriberFactory pubSubSubscriberFactory;
+   protected final Credentials credentials;
+   protected final String projectSubscriptionName;
+   protected final int maxMessagesPerPull;
+   protected final int maxMessagesToAcknowledge;
+   protected final AcknowledgeOnCheckpointFactory 
acknowledgeOnCheckpointFactory;
+
+   protected transient AcknowledgeOnCheckpoint 
acknowledgeOnCheckpoint;
+   protected transient SubscriberStub subscriber;
+   protected transient PullRequest pullRequest;
+   protected transient EventLoopGroup eventLoopGroup;
+
+   protected transient volatile boolean isRunning;
+   protected transient volatile ApiFuture messagesFuture;
+
+   PubSubSource(PubSubDeserializationSchema deserializationSchema, 
PubSubSubscriberFactory pubSubSubscriberFactory, Credentials credentials, 
String projectSubscriptionName, int maxMessagesPerPull, int 
maxMessagesToAcknowledge, AcknowledgeOnCheckpointFactory 
acknowledgeOnCheckpointFactory) {
+  

[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-05-02 Thread GitBox
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r280449150
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -0,0 +1,144 @@
+---
+title: "Google PubSub"
+nav-title: PubSub
+nav-parent_id: connectors
+nav-pos: 7
+---
+
+
+This connector provides a Source and Sink that can read from and write to
+[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add 
the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-gcp-pubsub{{ site.scala_version_suffix 
}}
+  {{ site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{ site.baseurl }}/dev/projectsetup/dependencies.html)
+for information about how to package the program with the libraries for
+cluster execution.
+
+## Consuming or Producing PubSubMessages
+
+The connector provides a connectors for receiving and sending messages from 
and to Google PubSub.
+Google PubSub has an `Atleast-Once` guarantee and as such the connector 
delivers the same guarantees.
+
+### PubSub SourceFunction
+
+The class `PubSubSource` has a builder to create PubSubsources: 
`PubSubSource.newBuilder(...)`
+
+There are several optional methods to alter how the PubSubSource is created, 
the bare minimum is to provide a Google project, Pubsub subscription and a way 
to deserialize the PubSubMessages.
+
+Example:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment streamExecEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+DeserializationSchema deserializer = (...);
 
 Review comment:
   the connector now uses a `PubSubDeserializationSchema` (see comments on it 
below)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280432286
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -392,43 +379,10 @@ public JobMaster(
final JobVertexID vertexID,
final ExecutionAttemptID executionAttempt) {
 
-   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
-   if (execution == null) {
-   // can happen when JobManager had already unregistered 
this execution upon on task failure,
-   // but TaskManager get some delay to aware of that 
situation
-   if (log.isDebugEnabled()) {
-   log.debug("Can not find Execution for attempt 
{}.", executionAttempt);
-   }
-   // but we should TaskManager be aware of this
-   return FutureUtils.completedExceptionally(new 
Exception("Can not find Execution for attempt " + executionAttempt));
-   }
-
-   final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
-   if (vertex == null) {
-   log.error("Cannot find execution vertex for vertex ID 
{}.", vertexID);
-   return FutureUtils.completedExceptionally(new 
Exception("Cannot find execution vertex for vertex ID " + vertexID));
-   }
-
-   if (vertex.getSplitAssigner() == null) {
-   log.error("No InputSplitAssigner for vertex ID {}.", 
vertexID);
-   return FutureUtils.completedExceptionally(new 
Exception("No InputSplitAssigner for vertex ID " + vertexID));
-   }
-
-   final InputSplit nextInputSplit = execution.getNextInputSplit();
-
-   if (log.isDebugEnabled()) {
-   log.debug("Send next input split {}.", nextInputSplit);
-   }
-
try {
-   final byte[] serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
-   return CompletableFuture.completedFuture(new 
SerializedInputSplit(serializedInputSplit));
-   } catch (Exception ex) {
-   log.error("Could not serialize the next input split of 
class {}.", nextInputSplit.getClass(), ex);
-   IOException reason = new IOException("Could not 
serialize the next input split of class " +
-   nextInputSplit.getClass() + ".", ex);
-   vertex.fail(reason);
-   return FutureUtils.completedExceptionally(reason);
+   return 
CompletableFuture.completedFuture(schedulerNG.requestNextInputSplit(vertexID, 
executionAttempt));
+   } catch (IOException e) {
 
 Review comment:
   I would also add a logging statement logging the `e` because it might 
potentially leave the distributed component here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-05-02 Thread GitBox
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r280448443
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -0,0 +1,144 @@
+---
+title: "Google PubSub"
+nav-title: PubSub
+nav-parent_id: connectors
+nav-pos: 7
+---
+
+
+This connector provides a Source and Sink that can read from and write to
+[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add 
the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-gcp-pubsub{{ site.scala_version_suffix 
}}
+  {{ site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{ site.baseurl }}/dev/projectsetup/dependencies.html)
+for information about how to package the program with the libraries for
+cluster execution.
 
 Review comment:
   Add a note that this connector is new and hasn't received widespread testing 
yet.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-05-02 Thread GitBox
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r280448936
 
 

 ##
 File path: docs/dev/connectors/pubsub.md
 ##
 @@ -0,0 +1,144 @@
+---
+title: "Google PubSub"
+nav-title: PubSub
+nav-parent_id: connectors
+nav-pos: 7
+---
+
+
+This connector provides a Source and Sink that can read from and write to
+[Google PubSub](https://cloud.google.com/pubsub). To use this connector, add 
the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-gcp-pubsub{{ site.scala_version_suffix 
}}
+  {{ site.version }}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See
+[here]({{ site.baseurl }}/dev/projectsetup/dependencies.html)
+for information about how to package the program with the libraries for
+cluster execution.
+
+## Consuming or Producing PubSubMessages
+
+The connector provides a connectors for receiving and sending messages from 
and to Google PubSub.
+Google PubSub has an `Atleast-Once` guarantee and as such the connector 
delivers the same guarantees.
 
 Review comment:
   ```suggestion
   Google PubSub has an `at-least-once` guarantee and as such the connector 
delivers the same guarantees.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-05-02 Thread GitBox
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r280449898
 
 

 ##
 File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/common/PubSubDeserializationSchema.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.gcp.pubsub.common;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the PubsubMessages
+ * into data types (Java/Scala objects) that are processed by Flink.
+ *
+ * @param  The type created by the deserialization schema.
+ */
+public interface PubSubDeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
 Review comment:
   What's the benefit of using this schema instead of the 
`DeserializationSchema`, shared by the other connectors?
   
   I think it would be a lot better to use the same deserialization schema as 
the other connectors, so that people can re-use existing implementations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-05-02 Thread GitBox
rmetzger commented on a change in pull request #6594: [FLINK-9311] [pubsub] 
Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#discussion_r280454402
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_streaming_gcp_pubsub.sh
 ##
 @@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cd "${END_TO_END_DIR}/flink-connector-gcp-pubsub-emulator-tests"
+
+mvn test -DskipTests=false
 
 Review comment:
   for me, the tests didn't execute:
   ```
   ➜  flink-connector-gcp-pubsub-emulator-tests git:(pr6594) ✗ mvn test 
-DskipTests=false
   [INFO] Scanning for projects...
   [INFO]
   [INFO] -< org.apache.flink:flink-connector-gcp-pubsub-emulator-tests 
>-
   [INFO] Building flink-connector-gcp-pubsub-emulator-tests 1.9-SNAPSHOT
   [INFO] [ jar 
]-
   [INFO]
   [INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ 
flink-connector-gcp-pubsub-emulator-tests ---
   [INFO]
   [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ 
flink-connector-gcp-pubsub-emulator-tests ---
   [INFO]
   [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven) @ 
flink-connector-gcp-pubsub-emulator-tests ---
   [INFO]
   [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @ 
flink-connector-gcp-pubsub-emulator-tests ---
   [INFO]
   [INFO] --- directory-maven-plugin:0.1:highest-basedir (directories) @ 
flink-connector-gcp-pubsub-emulator-tests ---
   [INFO] Highest basedir set to: /Users/robert/Projects/flink
   [INFO]
   [INFO] --- maven-remote-resources-plugin:1.5:process 
(process-resource-bundles) @ flink-connector-gcp-pubsub-emulator-tests ---
   [INFO]
   [INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ 
flink-connector-gcp-pubsub-emulator-tests ---
   [INFO] Using 'UTF-8' encoding to copy filtered resources.
   [INFO] skip non existing resourceDirectory 
/Users/robert/Projects/flink/flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/main/resources
   [INFO] Copying 3 resources
   [INFO]
   [INFO] --- maven-compiler-plugin:3.8.0:compile (default-compile) @ 
flink-connector-gcp-pubsub-emulator-tests ---
   [INFO] No sources to compile
   [INFO]
   [INFO] --- maven-resources-plugin:3.1.0:testResources 
(default-testResources) @ flink-connector-gcp-pubsub-emulator-tests ---
   [INFO] Using 'UTF-8' encoding to copy filtered resources.
   [INFO] Copying 1 resource
   [INFO] Copying 3 resources
   [INFO]
   [INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ 
flink-connector-gcp-pubsub-emulator-tests ---
   [INFO] Nothing to compile - all classes are up to date
   [INFO] 

   [INFO] BUILD SUCCESS
   [INFO] 

   [INFO] Total time:  7.389 s
   [INFO] Finished at: 2019-05-02T16:47:04+02:00
   [INFO] 

   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280434081
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -582,26 +458,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) 
{
final String registrationName,
final KvStateID kvStateId,
final InetSocketAddress kvStateServerAddress) {
-   if (jobGraph.getJobID().equals(jobId)) {
-   if (log.isDebugEnabled()) {
-   log.debug("Key value state registered for job 
{} under name {}.",
-   jobGraph.getJobID(), registrationName);
-   }
-
-   try {
-   
executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
-   jobVertexId, keyGroupRange, 
registrationName, kvStateId, kvStateServerAddress);
 
-   return 
CompletableFuture.completedFuture(Acknowledge.get());
-   } catch (Exception e) {
-   log.error("Failed to notify KvStateRegistry 
about registration {}.", registrationName, e);
-   return FutureUtils.completedExceptionally(e);
-   }
-   } else {
-   if (log.isDebugEnabled()) {
-   log.debug("Notification about key-value state 
registration for unknown job {} received.", jobId);
-   }
-   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   try {
+   schedulerNG.notifyKvStateRegistered(jobId, jobVertexId, 
keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
+   return 
CompletableFuture.completedFuture(Acknowledge.get());
+   } catch (FlinkJobNotFoundException e) {
 
 Review comment:
   same here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280432648
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -437,43 +391,20 @@ public JobMaster(
final IntermediateDataSetID intermediateResultId,
final ResultPartitionID resultPartitionId) {
 
-   final Execution execution = 
executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
-   if (execution != null) {
-   return 
CompletableFuture.completedFuture(execution.getState());
-   }
-   else {
-   final IntermediateResult intermediateResult =
-   
executionGraph.getAllIntermediateResults().get(intermediateResultId);
-
-   if (intermediateResult != null) {
-   // Try to find the producing execution
-   Execution producerExecution = intermediateResult
-   
.getPartitionById(resultPartitionId.getPartitionId())
-   .getProducer()
-   .getCurrentExecutionAttempt();
-
-   if 
(producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) {
-   return 
CompletableFuture.completedFuture(producerExecution.getState());
-   } else {
-   return 
FutureUtils.completedExceptionally(new 
PartitionProducerDisposedException(resultPartitionId));
-   }
-   } else {
-   return FutureUtils.completedExceptionally(new 
IllegalArgumentException("Intermediate data set with ID "
-   + intermediateResultId + " not 
found."));
-   }
+   try {
+   return 
CompletableFuture.completedFuture(schedulerNG.requestPartitionState(intermediateResultId,
 resultPartitionId));
+   } catch (PartitionProducerDisposedException e) {
 
 Review comment:
   I would also log the exception here before it leaves the component.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280447981
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java
 ##
 @@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import 

[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280453988
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java
 ##
 @@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import 

[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280453953
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java
 ##
 @@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import 

[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280434680
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -809,80 +625,18 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
final boolean advanceToEndOfEventTime,
final Time timeout) {
 
-   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-
-   if (checkpointCoordinator == null) {
-   return FutureUtils.completedExceptionally(new 
IllegalStateException(
-   String.format("Job %s is not a 
streaming job.", jobGraph.getJobID(;
-   }
-
-   if (targetDirectory == null && 
!checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
-   log.info("Trying to cancel job {} with savepoint, but 
no savepoint directory configured.", jobGraph.getJobID());
-
-   return FutureUtils.completedExceptionally(new 
IllegalStateException(
-   "No savepoint directory configured. You 
can either specify a directory " +
-   "while cancelling via 
-s :targetDirectory or configure a cluster-wide " +
-   "default via key '" + 
CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
-   }
-
-   final long now = System.currentTimeMillis();
-
-   // we stop the checkpoint coordinator so that we are guaranteed
-   // to have only the data of the synchronous savepoint committed.
-   // in case of failure, and if the job restarts, the coordinator
-   // will be restarted by the CheckpointCoordinatorDeActivator.
-   checkpointCoordinator.stopCheckpointScheduler();
-
-   final CompletableFuture savepointFuture = 
checkpointCoordinator
-   .triggerSynchronousSavepoint(now, 
advanceToEndOfEventTime, targetDirectory)
-   .handleAsync((completedCheckpoint, throwable) 
-> {
-   if (throwable != null) {
-   log.info("Failed during 
stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), 
throwable.getMessage());
-   throw new 
CompletionException(throwable);
-   }
-   return 
completedCheckpoint.getExternalPointer();
-   }, getMainThreadExecutor());
-
-   final CompletableFuture terminationFuture = 
executionGraph
-   .getTerminationFuture()
-   .handleAsync((jobstatus, throwable) -> {
-
-   if (throwable != null) {
-   log.info("Failed during 
stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), 
throwable.getMessage());
-   throw new 
CompletionException(throwable);
-   } else if(jobstatus != 
JobStatus.FINISHED) {
-   log.info("Failed during 
stopping job {} with a savepoint. Reason: Reached state {} instead of 
FINISHED.", jobGraph.getJobID(), jobstatus);
-   throw new 
CompletionException(new FlinkException("Reached state " + jobstatus + " instead 
of FINISHED."));
-   }
-   return jobstatus;
-   }, getMainThreadExecutor());
-
-   return savepointFuture.thenCompose((path) ->
-   terminationFuture.thenApply((jobStatus -> path)));
-   }
-
-   private void startCheckpointScheduler(final CheckpointCoordinator 
checkpointCoordinator) {
-   if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
-   try {
-   
checkpointCoordinator.startCheckpointScheduler();
-   } catch (IllegalStateException ignored) {
-   // Concurrent shut down of the coordinator
-   }
-   }
+   return schedulerNG.stopWithSavepoint(targetDirectory, 
advanceToEndOfEventTime);
}
 
@Override
public CompletableFuture 
requestOperatorBackPressureStats(final JobVertexID jobVertexId) {
-   final ExecutionJobVertex jobVertex = 

[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280438925
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -1006,101 +760,58 @@ private Acknowledge suspendExecution(final Exception 
cause) {
return Acknowledge.get();
}
 
-   private void assignExecutionGraph(
-   ExecutionGraph newExecutionGraph,
+   private void assignScheduler(
+   SchedulerNG newScheduler,
JobManagerJobMetricGroup newJobManagerJobMetricGroup) {
validateRunsInMainThread();
-   checkState(executionGraph.getState().isTerminalState());
+   checkState(schedulerNG.requestJobStatus().isTerminalState());
checkState(jobManagerJobMetricGroup == null);
 
-   executionGraph = newExecutionGraph;
+   schedulerNG = newScheduler;
jobManagerJobMetricGroup = newJobManagerJobMetricGroup;
}
 
-   private void resetAndScheduleExecutionGraph() throws Exception {
+   private void resetAndStartScheduler() throws Exception {
validateRunsInMainThread();
 
-   final CompletableFuture executionGraphAssignedFuture;
+   final CompletableFuture schedulerAssignedFuture;
 
-   if (executionGraph.getState() == JobStatus.CREATED) {
-   executionGraphAssignedFuture = 
CompletableFuture.completedFuture(null);
-   executionGraph.start(getMainThreadExecutor());
+   if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {
 
 Review comment:
   This should be gone once https://issues.apache.org/jira/browse/FLINK-11719 
has been resolved.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280433723
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -501,76 +432,21 @@ public void acknowledgeCheckpoint(
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
 
-   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-   final AcknowledgeCheckpoint ackMessage = new 
AcknowledgeCheckpoint(
-   jobID,
-   executionAttemptID,
-   checkpointId,
-   checkpointMetrics,
-   checkpointState);
-
-   if (checkpointCoordinator != null) {
-   getRpcService().execute(() -> {
-   try {
-   
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
-   } catch (Throwable t) {
-   log.warn("Error while processing 
checkpoint acknowledgement message", t);
-   }
-   });
-   } else {
-   String errorMessage = "Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator";
-   if (executionGraph.getState() == JobStatus.RUNNING) {
-   log.error(errorMessage, jobGraph.getJobID());
-   } else {
-   log.debug(errorMessage, jobGraph.getJobID());
-   }
-   }
+   schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, 
checkpointId, checkpointMetrics, checkpointState);
}
 
// TODO: This method needs a leader session ID
@Override
public void declineCheckpoint(DeclineCheckpoint decline) {
-   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-
-   if (checkpointCoordinator != null) {
-   getRpcService().execute(() -> {
-   try {
-   
checkpointCoordinator.receiveDeclineMessage(decline);
-   } catch (Exception e) {
-   log.error("Error in 
CheckpointCoordinator while processing {}", decline, e);
-   }
-   });
-   } else {
-   String errorMessage = "Received DeclineCheckpoint 
message for job {} with no CheckpointCoordinator";
-   if (executionGraph.getState() == JobStatus.RUNNING) {
-   log.error(errorMessage, jobGraph.getJobID());
-   } else {
-   log.debug(errorMessage, jobGraph.getJobID());
-   }
-   }
+   schedulerNG.declineCheckpoint(decline);
}
 
@Override
public CompletableFuture requestKvStateLocation(final 
JobID jobId, final String registrationName) {
-   // sanity check for the correct JobID
-   if (jobGraph.getJobID().equals(jobId)) {
-   if (log.isDebugEnabled()) {
-   log.debug("Lookup key-value state for job {} 
with registration " +
-   "name {}.", jobGraph.getJobID(), 
registrationName);
-   }
-
-   final KvStateLocationRegistry registry = 
executionGraph.getKvStateLocationRegistry();
-   final KvStateLocation location = 
registry.getKvStateLocation(registrationName);
-   if (location != null) {
-   return 
CompletableFuture.completedFuture(location);
-   } else {
-   return FutureUtils.completedExceptionally(new 
UnknownKvStateLocation(registrationName));
-   }
-   } else {
-   if (log.isDebugEnabled()) {
-   log.debug("Request of key-value state location 
for unknown job {} received.", jobId);
-   }
-   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   try {
+   return 
CompletableFuture.completedFuture(schedulerNG.requestKvStateLocation(jobId, 
registrationName));
+   } catch (UnknownKvStateLocation | FlinkJobNotFoundException e) {
 
 Review comment:
   logging statement missing


This is 

[GitHub] [flink] tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it

2019-05-02 Thread GitBox
tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280434123
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -611,26 +473,11 @@ public void declineCheckpoint(DeclineCheckpoint decline) 
{
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) {
-   if (jobGraph.getJobID().equals(jobId)) {
-   if (log.isDebugEnabled()) {
-   log.debug("Key value state unregistered for job 
{} under name {}.",
-   jobGraph.getJobID(), registrationName);
-   }
-
-   try {
-   
executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
-   jobVertexId, keyGroupRange, 
registrationName);
-
-   return 
CompletableFuture.completedFuture(Acknowledge.get());
-   } catch (Exception e) {
-   log.error("Failed to notify KvStateRegistry 
about unregistration {}.", registrationName, e);
-   return FutureUtils.completedExceptionally(e);
-   }
-   } else {
-   if (log.isDebugEnabled()) {
-   log.debug("Notification about key-value state 
deregistration for unknown job {} received.", jobId);
-   }
-   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   try {
+   schedulerNG.notifyKvStateUnregistered(jobId, 
jobVertexId, keyGroupRange, registrationName);
+   return 
CompletableFuture.completedFuture(Acknowledge.get());
+   } catch (FlinkJobNotFoundException e) {
 
 Review comment:
   and here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode

2019-05-02 Thread GitBox
lamber-ken edited a comment on issue #8254: [FLINK-12219][runtime] Yarn 
application can't stop when flink job failed in per-job yarn cluste mode
URL: https://github.com/apache/flink/pull/8254#issuecomment-488702391
 
 
   hi, @tillrohrmann,  
   
   Sorry, it's ok, I just think you just add a future tool just now. you had 
fix the issue #8334 too.
   
   Best.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode

2019-05-02 Thread GitBox
lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application 
can't stop when flink job failed in per-job yarn cluste mode
URL: https://github.com/apache/flink/pull/8254#issuecomment-488702391
 
 
   hi, @tillrohrmann,
   
   Sorry, it's ok, I just think you just add a future tool just now. you had 
fix the issue #8334 too.
   
   Best.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-12393) Add the user-facing classes of the new type system

2019-05-02 Thread Timo Walther (JIRA)
Timo Walther created FLINK-12393:


 Summary: Add the user-facing classes of the new type system
 Key: FLINK-12393
 URL: https://issues.apache.org/jira/browse/FLINK-12393
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


FLINK-12253 introduces logical types that will be used mostly internally. Users 
will use the {{DataType}} stack described in FLIP-37. This issue describes the 
class hierarchy around this class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] lamber-ken commented on issue #8334: [FLINK-12219] Add utility to check for normal future completion

2019-05-02 Thread GitBox
lamber-ken commented on issue #8334: [FLINK-12219] Add utility to check for 
normal future completion
URL: https://github.com/apache/flink/pull/8334#issuecomment-488700701
 
 
   @tillrohrmann, LGTM.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-02 Thread GitBox
flinkbot commented on issue #8335: [FLINK-12253][table-common] Setup a class 
hierarchy for the new logical type system
URL: https://github.com/apache/flink/pull/8335#issuecomment-488698361
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application can't stop when flink job failed in per-job yarn cluste mode

2019-05-02 Thread GitBox
lamber-ken commented on issue #8254: [FLINK-12219][runtime] Yarn application 
can't stop when flink job failed in per-job yarn cluste mode
URL: https://github.com/apache/flink/pull/8254#issuecomment-488698323
 
 
   hi, @tillrohrmann, I think your fix is fantastic 
[tillrohrmann@524ead4](https://github.com/tillrohrmann/flink/commit/524ead44f68ff02d52e8ff561c9e235e5b353123).
   
   like the [FLINK-12247](https://github.com/apache/flink/pull/8250), We used 
flink from 1.2.1 to 1.6.3 and we have been puzzled by this problem for a long 
time. Because it‘s needs the count of flink job failed attempt up to the max 
attempt history size, and no error log information that we can find the reason 
why the flink job failed but the yarn application don't stop in detach mode. 
   
   May be other users had met the same bug, but they did not mention it or fix 
it. So I think it's better to log the error information and stop the 
application like your solution 
[tillrohrmann@524ead4](https://github.com/tillrohrmann/flink/commit/524ead44f68ff02d52e8ff561c9e235e5b353123).
   
   If don't do that, any unexpected exception happens, the yarn application 
will can not stop again. It will need users to find out the reason, it's hard 
to users.  
   
   Best.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12253) Setup a class hierarchy for the new logical type system

2019-05-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12253:
---
Labels: pull-request-available  (was: )

> Setup a class hierarchy for the new logical type system
> ---
>
> Key: FLINK-12253
> URL: https://issues.apache.org/jira/browse/FLINK-12253
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Setup a new class hierarchy around {{LogicalType}} in {{table-common}}.
> The classes implement the types listed in the table of FLIP-37.
> The classes won't be connected to the API yet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] twalthr opened a new pull request #8335: [FLINK-12253][table-common] Setup a class hierarchy for the new logical type system

2019-05-02 Thread GitBox
twalthr opened a new pull request #8335: [FLINK-12253][table-common] Setup a 
class hierarchy for the new logical type system
URL: https://github.com/apache/flink/pull/8335
 
 
   ## What is the purpose of the change
   
   This PR implements all logical types that Flink's type system should support 
in the near future. As the name indicates, these type classes are purely 
logical to allow declarations and intentions across modules and in the API. 
Planners/runtime can still decide which type and precision is supported 
physically.
   
   An exact listing can be found in FLIP-37: 
https://docs.google.com/document/d/1a9HUb6OaBIoj9IRfbILcMFPrOL7ALeZ3rVI66dvA2_U/edit#
   
   The most important content of this PR are the JavaDoc comments that clearly 
define each type and should avoid ambiguity. Catalogs/connectors/planners 
should adapt to those definitions.
   
   ## Brief change log
   
   - 27 logical types added
   
   ## Verifying this change
   
   - `org.apache.flink.table.types.LogicalTypesTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? JavaDocs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12253) Setup a class hierarchy for the new logical type system

2019-05-02 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-12253:
-
Description: 
Setup a new class hierarchy around {{LogicalType}} in {{table-common}}.

The classes implement the types listed in the table of FLIP-37.

The classes won't be connected to the API yet.

  was:
Setup a new class hierarchy around {{DataType}} and {{LogicalType}} in 
{{table-common}}.

The classes implement the types listed in the table of FLIP-37.

The classes won't be connected to the API yet.


> Setup a class hierarchy for the new logical type system
> ---
>
> Key: FLINK-12253
> URL: https://issues.apache.org/jira/browse/FLINK-12253
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> Setup a new class hierarchy around {{LogicalType}} in {{table-common}}.
> The classes implement the types listed in the table of FLIP-37.
> The classes won't be connected to the API yet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12253) Setup a class hierarchy for the new logical type system

2019-05-02 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-12253:
-
Summary: Setup a class hierarchy for the new logical type system  (was: 
Setup a class hierarchy for the new type system)

> Setup a class hierarchy for the new logical type system
> ---
>
> Key: FLINK-12253
> URL: https://issues.apache.org/jira/browse/FLINK-12253
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> Setup a new class hierarchy around {{DataType}} and {{LogicalType}} in 
> {{table-common}}.
> The classes implement the types listed in the table of FLIP-37.
> The classes won't be connected to the API yet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r280416137
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for streaming job which will schedule 
all tasks at the same time.
+ */
+public class EagerSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
+
+   public EagerSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   }
+
+   @Override
+   public void startScheduling() {
+   // Schedule all the vertices in scheduling topology at the same 
time.
 
 Review comment:
   I think this comment is not needed as it is already covered in the class' 
javadoc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r279634373
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingVertex.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Collection;
+
+/**
+ * A Simple scheduling vertex for testing purposes.
+ */
+public class TestingSchedulingVertex implements SchedulingVertex {
+
+   private final ExecutionVertexID executionVertexId;
+
+   public TestingSchedulingVertex(JobVertexID jobVertexId, int 
subtaskIndex) {
+   this.executionVertexId = new ExecutionVertexID(jobVertexId, 
subtaskIndex);
+   }
+
+   @Override
+   public ExecutionVertexID getId() {
+   return executionVertexId;
+   }
+
+   @Override
+   public ExecutionState getState() {
+   return ExecutionState.CREATED;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return executionVertexId.getJobVertexId();
+   }
+
+   @Override
+   public Collection 
getConsumedResultPartitions() {
+   return null;
+   }
+
+   @Override
+   public Collection 
getProducedResultPartitions() {
+   return null;
+   }
 
 Review comment:
   Formatting looks off here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r280435403
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link EagerSchedulingStrategy}.
+ */
+public class EagerSchedulingStrategyTest extends TestLogger {
+
+   /**
+* Tests that when start scheduling eager scheduling strategy will 
start all vertices in scheduling topology.
+*/
+   @Test
+   public void testStartScheduling() {
+   TestingSchedulingTopology testingSchedulingTopology = new 
TestingSchedulingTopology();
+   JobVertexID jobVertexID = new JobVertexID();
+   testingSchedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertexID, 0));
+   testingSchedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertexID, 1));
+   testingSchedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertexID, 2));
+   testingSchedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertexID, 3));
+   testingSchedulingTopology.addSchedulingVertex(new 
TestingSchedulingVertex(jobVertexID, 4));
+
+   TestingSchedulerOperation testingSchedulerOperation = new 
TestingSchedulerOperation();
+   EagerSchedulingStrategy schedulingStrategy = new 
EagerSchedulingStrategy(
+   testingSchedulerOperation,
+   testingSchedulingTopology,
+   null);
+
+   schedulingStrategy.startScheduling();
+
+   assertEquals(5, 
testingSchedulerOperation.getScheduledVertices().size());
 
 Review comment:
   The assertions can be stricter. In particular, we should assert:
   
   1. All vertices in the topology are scheduled at once, i.e., there is only 
one call to `allocateSlotsAndDeploy()` 
   2. The deployed ExecutionVertexIDs match with the vertices in the topology.
   
   To ease testing, I think it would help to make the constructor of 
`TestingSchedulingVertex` accept the `ExecutionVertexID` directly:
   ```
   TestingSchedulingVertex(new ExecutionVetexID(...))
   ```
   
   Using [hamcrest 
matchers](https://github.com/junit-team/junit4/wiki/matchers-and-assertthat):
   
   ```
   assertThat(testingSchedulingOperations..., 
containsInAnyOrder(executionVertexId1, executionVertexId2))
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r280430217
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperation.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A Simple scheduler operation for testing purposes.
+ */
+public class TestingSchedulerOperation implements SchedulerOperations {
+
+   private final List scheduledVertices = 
new ArrayList<>();
 
 Review comment:
   If we make this a `List>` our 
assertions in the tests can be stricter, i.e., we can assert on how many times 
`allocateSlotsAndDeploy` was called.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r279718029
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for streaming job which will schedule 
all tasks at the same time.
+ */
+public class EagerSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
+
+   public EagerSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
 
 Review comment:
   Let's remove `jobGraph` from the constructor since it's not used inside 
`EagerSchedulingStrategy`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r280423763
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for streaming job which will schedule 
all tasks at the same time.
+ */
+public class EagerSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
+
+   public EagerSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology,
+   JobGraph jobGraph) {
+   this.schedulerOperations = checkNotNull(schedulerOperations);
+   this.schedulingTopology = checkNotNull(schedulingTopology);
+   }
+
+   @Override
+   public void startScheduling() {
+   // Schedule all the vertices in scheduling topology at the same 
time.
+   List 
executionVertexDeploymentOptions = new ArrayList<>();
 
 Review comment:
   Some code from here is duplicated in `restartTasks`. I would propose to 
split the methods into smaller units, e.g.,:
   ```
@Override
public void startScheduling() {
final Set allVertices = 
getAllVerticesFromTopology();
allocateSlotsAndDeploy(allVertices);
}
   
@Override
public void restartTasks(Set verticesNeedingRestart) 
{
allocateSlotsAndDeploy(verticesNeedingRestart);
}
   
private Set getAllVerticesFromTopology() {
return StreamSupport
.stream(schedulingTopology.getVertices().spliterator(), 
false)
.map(SchedulingVertex::getId)
.collect(Collectors.toSet());
}
   
private void allocateSlotsAndDeploy(final Set 
allVertices) {
final List 
executionVertexDeploymentOptions = 
createExecutionVertexDeploymentOptions(allVertices);

schedulerOperations.allocateSlotsAndDeploy(executionVertexDeploymentOptions);
}
   
private List 
createExecutionVertexDeploymentOptions(final Iterable 
verticesNeedingRestart) {
List 
executionVertexDeploymentOptions = new ArrayList<>();
for (ExecutionVertexID executionVertexID : 
verticesNeedingRestart) {
executionVertexDeploymentOptions.add(
new 
ExecutionVertexDeploymentOption(executionVertexID, deploymentOption));
}
return executionVertexDeploymentOptions;
}
   ```
   Feel free to adapt my proposal to your liking.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r279731809
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocator.java
 ##
 @@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Component responsible for assigning slots to a collection of {@link 
Execution}.
+ */
+public interface ExecutionSlotAllocator {
 
 Review comment:
   Let's add this interface under this ticket: 
https://issues.apache.org/jira/browse/FLINK-12372


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r279724405
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A Simple scheduling topology for testing purposes.
 
 Review comment:
   nit: _A simple [...]_


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r280319208
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingVertex.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Collection;
+
+/**
+ * A Simple scheduling vertex for testing purposes.
+ */
+public class TestingSchedulingVertex implements SchedulingVertex {
+
+   private final ExecutionVertexID executionVertexId;
+
+   public TestingSchedulingVertex(JobVertexID jobVertexId, int 
subtaskIndex) {
+   this.executionVertexId = new ExecutionVertexID(jobVertexId, 
subtaskIndex);
+   }
+
+   @Override
+   public ExecutionVertexID getId() {
+   return executionVertexId;
+   }
+
+   @Override
+   public ExecutionState getState() {
+   return ExecutionState.CREATED;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return executionVertexId.getJobVertexId();
+   }
+
+   @Override
+   public Collection 
getConsumedResultPartitions() {
+   return null;
+   }
+
+   @Override
+   public Collection 
getProducedResultPartitions() {
+   return null;
 
 Review comment:
   To avoid handling `null` as a special case, we should just return 
`Collections.emptyList()` if there are no `SchedulingResultPartitions`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r279728912
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingVertex.java
 ##
 @@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Collection;
+
+/**
+ * A Simple scheduling vertex for testing purposes.
+ */
+public class TestingSchedulingVertex implements SchedulingVertex {
+
+   private final ExecutionVertexID executionVertexId;
+
+   public TestingSchedulingVertex(JobVertexID jobVertexId, int 
subtaskIndex) {
+   this.executionVertexId = new ExecutionVertexID(jobVertexId, 
subtaskIndex);
+   }
+
+   @Override
+   public ExecutionVertexID getId() {
+   return executionVertexId;
+   }
+
+   @Override
+   public ExecutionState getState() {
+   return ExecutionState.CREATED;
+   }
+
+   @Override
+   public JobVertexID getJobVertexId() {
+   return executionVertexId.getJobVertexId();
+   }
+
+   @Override
+   public Collection 
getConsumedResultPartitions() {
+   return null;
 
 Review comment:
   To avoid handling `null` as a special case, we should just return 
`Collections.emptyList()` if there are no `SchedulingResultPartitions`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r280310271
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotExecutionVertexAssignment.java
 ##
 @@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The slot assignment for a {@link ExecutionVertex}.
+ */
+public class SlotExecutionVertexAssignment {
 
 Review comment:
   Let's add this class under this ticket: 
https://issues.apache.org/jira/browse/FLINK-12372


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r279729385
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A Simple scheduling topology for testing purposes.
+ */
+public class TestingSchedulingTopology implements SchedulingTopology {
+
+   private final List schedulingVertices = new 
ArrayList<>();
+
+   @Override
+   public Iterable getVertices() {
+   return schedulingVertices;
+   }
+
+   @Override
+   public Optional getVertex(ExecutionVertexID 
executionVertexId)  {
+   SchedulingVertex returnVertex = null;
+   for (SchedulingVertex schedulingVertex : schedulingVertices) {
+   if (schedulingVertex.getId().equals(executionVertexId)) 
{
+   returnVertex = schedulingVertex;
+   break;
+   }
+   }
+   return Optional.ofNullable(returnVertex);
+   }
+
+   @Override
+   public Optional getResultPartition(
+   IntermediateResultPartitionID 
intermediateResultPartitionId) {
+   return Optional.ofNullable(null);
 
 Review comment:
   I think it's better to honor the contract of this method and provide a 
_"working"_ implementation:
   
   ```
   private final Map 
schedulingResultPartitions = new HashMap<>();
   ```
   
   ```
@Override
public Optional getResultPartition(
IntermediateResultPartitionID 
intermediateResultPartitionId) {
return 
Optional.ofNullable(schedulingResultPartitions.get(intermediateResultPartitionId));
}
   
public void addSchedulingVertex(SchedulingVertex schedulingVertex) {
addResultPartitions(schedulingVertex);
   
// ...
}
   
private void addResultPartitions(final SchedulingVertex  
schedulingVertex) {

addResultPartitions(schedulingVertex.getConsumedResultPartitions());

addResultPartitions(schedulingVertex.getProducedResultPartitions());
}
   
private void addResultPartitions(final 
Collection resultPartitions) {
for (SchedulingResultPartition schedulingResultPartition : 
resultPartitions) {

schedulingResultPartitions.put(schedulingResultPartition.getId(), 
schedulingResultPartition);
}
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r280310078
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexSchedulingRequirements.java
 ##
 @@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The requirements for scheduling a {@link ExecutionVertex}.
+ */
+public class ExecutionVertexSchedulingRequirements {
 
 Review comment:
   Let's add this class under this ticket: 
https://issues.apache.org/jira/browse/FLINK-12372


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r279718671
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperation.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A Simple scheduler operation for testing purposes.
+ */
+public class TestingSchedulerOperation implements SchedulerOperations {
+
+   private final List scheduledVertices = 
new ArrayList<>();
+
+   @Override
+   public void 
allocateSlotsAndDeploy(Collection 
executionVertexDeploymentOptions) {
+   scheduledVertices.addAll(executionVertexDeploymentOptions);
+   }
+
+   public List getScheduledVertices() {
+   return scheduledVertices;
+   }
 
 Review comment:
   Formatting looks off here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r280430142
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperation.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A Simple scheduler operation for testing purposes.
+ */
+public class TestingSchedulerOperation implements SchedulerOperations {
+
+   private final List scheduledVertices = 
new ArrayList<>();
+
+   @Override
+   public void 
allocateSlotsAndDeploy(Collection 
executionVertexDeploymentOptions) {
+   scheduledVertices.addAll(executionVertexDeploymentOptions);
+   }
+
+   public List getScheduledVertices() {
+   return scheduledVertices;
 
 Review comment:
   I would prefer to return an immutable collection here to avoid accidental 
modification.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r280426220
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperation.java
 ##
 @@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A Simple scheduler operation for testing purposes.
+ */
+public class TestingSchedulerOperation implements SchedulerOperations {
 
 Review comment:
   To be consistent with the interface, this should be named 
`TestingSchedulerOperations` (plural).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] Implement Eager Scheduling Strategy

2019-05-02 Thread GitBox
GJL commented on a change in pull request #8296: [FLINK-12228] [runtime] 
Implement Eager Scheduling Strategy
URL: https://github.com/apache/flink/pull/8296#discussion_r279733104
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerOperations.java
 ##
 @@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Default {@link SchedulerOperations} which will allocate slots and deploy 
the vertices when all slots are returned.
+ */
+public class DefaultSchedulerOperations implements SchedulerOperations {
 
 Review comment:
   Nice implementation but I think it is outside of the scope of FLINK-12228. I 
would propose to save this code and revisit it when we implement the actual 
scheduler. Also the changes to `Execution` should be reverted for now, and 
revisited later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >