[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-21 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r286074850
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -280,6 +315,142 @@
 */
void sqlUpdate(String stmt, QueryConfig config);
 
+   /**
+* Gets the current default catalog name of the current session.
+*
+* @return The current default catalog name that is used for the path 
resolution.
+* @see TableEnvironment#useCatalog(String)
+*/
+   String getCurrentCatalog();
+
+   /**
+* Sets the current catalog to the given value. It also sets the default
+* database to the catalog's default one. To assign both catalog and 
database explicitly
+* see {@link TableEnvironment#useDatabase(String, String)}.
+*
+* This is used during the resolution of object paths. Both the 
catalog and database are optional
+* when referencing catalog objects(tables, views etc.). The algorithm 
looks for requested objects in following
+* paths in that order:
+* 
+* {@code 
[current-catalog].[current-database].[requested-path]}
+* {@code [current-catalog].[requested-path]}
+* {@code [requested-path]}
+* 
+*
+* Example:
+*
+* Given structure with default catalog set to {@code 
default-catalog} and default database set to
+* {@code default-database}.
+* 
+* root:
+*   |- default-catalog
+*   |- default-database
+*   |- tab1
+*   |- db1
+*   |- tab1
+*   |- cat1
+*   |- db1
+*   |- tab1
+* 
+*
+* The following table describes resolved paths:
+* 
+* 
+* 
+* Requested path
+* Resolved path
+* 
+* 
+* 
+* 
+* tab1
+* default-catalog.default-database.tab1
+* 
+* 
+* db1.tab1
+* default-catalog.db1.tab1
+* 
+* 
+* cat1.db1.tab1
+* cat1.db1.tab1
+* 
+* 
+* 
+*
+* @param catalogName The name of the catalog to set as the current 
default catalog.
+* @throws CatalogException thrown if a catalog with given name could 
not be set as the default one
+*/
+   void useCatalog(String catalogName);
+
+   /**
+* Gets the current default database name of the running session.
+*
+* @return The name of the current database of the current catalog.
+* @see TableEnvironment#useDatabase(String, String)
+*/
+   String getCurrentDatabase();
+
+   /**
+* Sets the current default catalog and database. That path will be 
used as the default one
+* when looking for unqualified object names.
+*
+* This is used during the resolution of object paths. Both the 
catalog and database are optional
+* when referencing catalog objects(tables, views etc.). The algorithm 
looks for requested objects in following
+* paths in that order:
+* 
+* {@code 
[current-catalog].[current-database].[requested-path]}
+* {@code [current-catalog].[requested-path]}
+* {@code [requested-path]}
+* 
+*
+* Example:
+*
+* Given structure with default catalog set to {@code 
default-catalog} and default database set to
+* {@code default-database}.
+* 
+* root:
+*   |- default-catalog
+*   |- default-database
+*   |- tab1
+*   |- db1
+*   |- tab1
+*   |- cat1
+*   |- db1
+*   |- tab1
+* 
+*
+* The following table describes resolved paths:
+* 
+* 
+* 
+* Requested path
+* Resolved path
+* 
+* 
+* 
+* 
+* tab1
+* default-catalog.default-database.tab1
+* 
+* 
+* db1.tab1
+* default-catalog.db1.tab1
+* 
+* 
+* cat1.db1.tab1
+* cat1.db1.tab1
+* 
+* 
+* 
+*
+* @param catalogName The name of the catalog to set as the current 
catalog.
+* @param databaseName The name of the database to set as the current 

[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-21 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r286069455
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/TestExternalTableSourceFactory.java
 ##
 @@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+
+/**
+ * Table source factory for testing. It creates a dummy {@link TableSource}
+ * that returns an empty {@link TableSchema}.
+ */
+public class TestExternalTableSourceFactory implements TableSourceFactory 
{
 
 Review comment:
   None of the existing factories actually create a `TableSource`, whereas it 
is needed to get a `TableSchema`. This is required for the path resolution, as 
the output of a path resolution is the full resolved path and the `TableSchema` 
of the entry.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-21 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r286058606
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
 ##
 @@ -283,9 +285,13 @@ static BatchTableEnvironment create(ExecutionEnvironment 
executionEnvironment) {
 */
static BatchTableEnvironment create(ExecutionEnvironment 
executionEnvironment, TableConfig tableConfig) {
try {
-   Class clazz = 
Class.forName("org.apache.flink.table.api.java.BatchTableEnvImpl");
-   Constructor con = 
clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class);
-   return (BatchTableEnvironment) 
con.newInstance(executionEnvironment, tableConfig);
+   Class clazz = 
Class.forName("org.apache.flink.table.api.java.BatchTableEnvImpl");
+   Constructor con = 
clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, 
CatalogManager.class);
+   CatalogManager catalogManager = new CatalogManager(
+   tableConfig.getBultinCatalogName(),
 
 Review comment:
   I think both spellings are actually correct, but I will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-21 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r286058606
 
 

 ##
 File path: 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
 ##
 @@ -283,9 +285,13 @@ static BatchTableEnvironment create(ExecutionEnvironment 
executionEnvironment) {
 */
static BatchTableEnvironment create(ExecutionEnvironment 
executionEnvironment, TableConfig tableConfig) {
try {
-   Class clazz = 
Class.forName("org.apache.flink.table.api.java.BatchTableEnvImpl");
-   Constructor con = 
clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class);
-   return (BatchTableEnvironment) 
con.newInstance(executionEnvironment, tableConfig);
+   Class clazz = 
Class.forName("org.apache.flink.table.api.java.BatchTableEnvImpl");
+   Constructor con = 
clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, 
CatalogManager.class);
+   CatalogManager catalogManager = new CatalogManager(
+   tableConfig.getBultinCatalogName(),
 
 Review comment:
   I think both spellings are actually correct, but I will change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-17 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284995835
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/PlanningConfigurationBuilder.java
 ##
 @@ -129,16 +142,48 @@ public Context getContext() {
}
 
/**
-* Creates a configured {@link FrameworkConfig} for a planning session.
-*
-* @param defaultSchema the default schema to look for first during 
planning
-* @return configured framework config
+* Returns the SQL parser config for this environment including a 
custom Calcite configuration.
 */
-   public FrameworkConfig createFrameworkConfig(SchemaPlus defaultSchema) {
+   public SqlParser.Config getSqlParserConfig() {
+   return 
JavaScalaConversionUtil.toJava(calciteConfig(tableConfig).sqlParserConfig()).orElseGet(()
 ->
+   // we use Java lex because back ticks are easier than 
double quotes in programming
+   // and cases are preserved
+   SqlParser
+   .configBuilder()
+   .setLex(Lex.JAVA)
+   .build());
+   }
+
+   private CatalogReader createCatalogReader(
+   boolean lenientCaseSensitivity,
+   String currentCatalog,
+   String currentDatabase) {
+   SqlParser.Config sqlParserConfig = getSqlParserConfig();
+   final boolean caseSensitive;
+   if (lenientCaseSensitivity) {
+   caseSensitive = false;
+   } else {
+   caseSensitive = sqlParserConfig.caseSensitive();
+   }
+
+   SqlParser.Config parserConfig = 
SqlParser.configBuilder(sqlParserConfig)
+   .setCaseSensitive(caseSensitive)
+   .build();
+
+   return new CatalogReader(
 
 Review comment:
   I would say this is the core difference from the previous PR.
   
   We do not create a separate Calcite schema, but we use the 
`CatalogManagerCalciteSchema` as the root one (that's why we had to create the 
`CalciteSchemaBuilder`). At the same time `CatalogManagerCalciteSchema` is just 
a thin adapter around `CatalogManager`. That means any changes to 
`CatalogManager` are directly visible by Calcite.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284857074
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -19,33 +19,35 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.CatalogAlreadyExistsException;
 import org.apache.flink.table.api.CatalogNotExistException;
-import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static java.lang.String.format;
 
 Review comment:
   Anyway, after a quick scan to be remain consistent with other places, I will 
change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284855245
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -19,33 +19,35 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.CatalogAlreadyExistsException;
 import org.apache.flink.table.api.CatalogNotExistException;
-import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static java.lang.String.format;
 
 Review comment:
   I rephrased my previous comment and I think we agree in the principal. I 
guess the only difference is if we assume `String#format` and `Arrays#asList` 
to be well known by majority of java developers.
   
   They really broadly used in Flink's code base(and not only in Flink's) and I 
truly believe they are rarely confused.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284853604
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -250,56 +258,64 @@ public void setCurrentDatabase(String databaseName) 
throws DatabaseNotExistExcep
public Optional resolveTable(String... 
tablePath) {
checkArgument(tablePath != null && tablePath.length != 0, 
"Table path must not be null or empty.");
 
-   List defaultPath = new ArrayList<>();
-   defaultPath.add(currentCatalogName);
-   defaultPath.add(currentDatabaseName);
+   List userPath = asList(tablePath);
 
-   List userPath = Arrays.asList(tablePath);
-   defaultPath.addAll(userPath);
+   List> prefixes = asList(
+   asList(currentCatalogName, currentDatabaseName),
+   singletonList(currentCatalogName),
+   emptyList()
+   );
 
-   Optional inDefaultPath = 
lookupPath(defaultPath);
-
-   if (inDefaultPath.isPresent()) {
-   return inDefaultPath;
-   } else {
-   return lookupPath(userPath);
+   for (List prefix : prefixes) {
+   Optional potentialTable = 
lookupPath(prefix, userPath);
+   if (potentialTable.isPresent()) {
+   return potentialTable;
+   }
}
+
+   return Optional.empty();
}
 
-   private Optional lookupPath(List path) {
+   private Optional lookupPath(List prefix, 
List userPath) {
try {
-   Optional potentialTable = 
lookupCatalogTable(path);
+   List path = new ArrayList<>(prefix);
+   path.addAll(userPath);
+
+   Optional potentialTable = 
lookupCatalogTable(path);
 
 Review comment:
   Ok, I misunderstood you. 
   
   I don't like this approach as this way both `lookupPath` and 
`lookupCatalogTable` validate structure of the `Catalog` path. In my approach 
the whole logic is enclosed in a single lookup**CatalogTable** method.
   
   This is not a performance critical code. We do not need to optimize the 
number of function calls, but should rather optimize for readability and 
separation of concerns.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284853604
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -250,56 +258,64 @@ public void setCurrentDatabase(String databaseName) 
throws DatabaseNotExistExcep
public Optional resolveTable(String... 
tablePath) {
checkArgument(tablePath != null && tablePath.length != 0, 
"Table path must not be null or empty.");
 
-   List defaultPath = new ArrayList<>();
-   defaultPath.add(currentCatalogName);
-   defaultPath.add(currentDatabaseName);
+   List userPath = asList(tablePath);
 
-   List userPath = Arrays.asList(tablePath);
-   defaultPath.addAll(userPath);
+   List> prefixes = asList(
+   asList(currentCatalogName, currentDatabaseName),
+   singletonList(currentCatalogName),
+   emptyList()
+   );
 
-   Optional inDefaultPath = 
lookupPath(defaultPath);
-
-   if (inDefaultPath.isPresent()) {
-   return inDefaultPath;
-   } else {
-   return lookupPath(userPath);
+   for (List prefix : prefixes) {
+   Optional potentialTable = 
lookupPath(prefix, userPath);
+   if (potentialTable.isPresent()) {
+   return potentialTable;
+   }
}
+
+   return Optional.empty();
}
 
-   private Optional lookupPath(List path) {
+   private Optional lookupPath(List prefix, 
List userPath) {
try {
-   Optional potentialTable = 
lookupCatalogTable(path);
+   List path = new ArrayList<>(prefix);
+   path.addAll(userPath);
+
+   Optional potentialTable = 
lookupCatalogTable(path);
 
 Review comment:
   Ok, I misunderstood you. 
   
   I don't like this approach as this way both `lookupPath` and 
`lookupCatalogTable` validate structure of the `Catalog` path. In my approach 
the whole logic is enclosed in a single lookup**CatalogTable** method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284851991
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -19,33 +19,35 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.CatalogAlreadyExistsException;
 import org.apache.flink.table.api.CatalogNotExistException;
-import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static java.lang.String.format;
 
 Review comment:
   I think we should apply common sense here. In my experience it makes it much 
cleaner when we statically import well known methods or constants (like e.g. 
`String.format()`/`Assert.assertTrue`\`Collectors.toList()` etc.). There is 
very little chance anyone will confuse 
   ```
   CatalogException(format(
"A database with name [%s] does not exist in the catalog: [%s].",
databaseName,
currentCatalogName))
   ```
   with any other `format`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284830405
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -19,33 +19,35 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.CatalogAlreadyExistsException;
 import org.apache.flink.table.api.CatalogNotExistException;
-import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static java.lang.String.format;
 
 Review comment:
   I tend to disagree. My experience is, that it is usually the other way 
round(with some rare exceptions). Static imports make the code more readable 
and maintainable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284830405
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -19,33 +19,35 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.CatalogAlreadyExistsException;
 import org.apache.flink.table.api.CatalogNotExistException;
-import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static java.lang.String.format;
 
 Review comment:
   I tend to disagree. My experience is, that it is usually the other way 
round(with some rare exceptions). Static imports make the code more readable 
and maintainable.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284825268
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -250,56 +258,64 @@ public void setCurrentDatabase(String databaseName) 
throws DatabaseNotExistExcep
public Optional resolveTable(String... 
tablePath) {
checkArgument(tablePath != null && tablePath.length != 0, 
"Table path must not be null or empty.");
 
-   List defaultPath = new ArrayList<>();
-   defaultPath.add(currentCatalogName);
-   defaultPath.add(currentDatabaseName);
+   List userPath = asList(tablePath);
 
-   List userPath = Arrays.asList(tablePath);
-   defaultPath.addAll(userPath);
+   List> prefixes = asList(
+   asList(currentCatalogName, currentDatabaseName),
+   singletonList(currentCatalogName),
+   emptyList()
+   );
 
-   Optional inDefaultPath = 
lookupPath(defaultPath);
-
-   if (inDefaultPath.isPresent()) {
-   return inDefaultPath;
-   } else {
-   return lookupPath(userPath);
+   for (List prefix : prefixes) {
+   Optional potentialTable = 
lookupPath(prefix, userPath);
+   if (potentialTable.isPresent()) {
+   return potentialTable;
+   }
}
+
+   return Optional.empty();
}
 
-   private Optional lookupPath(List path) {
+   private Optional lookupPath(List prefix, 
List userPath) {
try {
-   Optional potentialTable = 
lookupCatalogTable(path);
+   List path = new ArrayList<>(prefix);
+   path.addAll(userPath);
+
+   Optional potentialTable = 
lookupCatalogTable(path);
 
 Review comment:
   External catalogs support arbitrary nesting, so we are not sure about that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284828531
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -250,56 +258,64 @@ public void setCurrentDatabase(String databaseName) 
throws DatabaseNotExistExcep
public Optional resolveTable(String... 
tablePath) {
checkArgument(tablePath != null && tablePath.length != 0, 
"Table path must not be null or empty.");
 
-   List defaultPath = new ArrayList<>();
-   defaultPath.add(currentCatalogName);
-   defaultPath.add(currentDatabaseName);
+   List userPath = asList(tablePath);
 
-   List userPath = Arrays.asList(tablePath);
-   defaultPath.addAll(userPath);
+   List> prefixes = asList(
+   asList(currentCatalogName, currentDatabaseName),
+   singletonList(currentCatalogName),
+   emptyList()
+   );
 
-   Optional inDefaultPath = 
lookupPath(defaultPath);
-
-   if (inDefaultPath.isPresent()) {
-   return inDefaultPath;
-   } else {
-   return lookupPath(userPath);
+   for (List prefix : prefixes) {
+   Optional potentialTable = 
lookupPath(prefix, userPath);
+   if (potentialTable.isPresent()) {
+   return potentialTable;
+   }
}
+
+   return Optional.empty();
}
 
-   private Optional lookupPath(List path) {
+   private Optional lookupPath(List prefix, 
List userPath) {
try {
-   Optional potentialTable = 
lookupCatalogTable(path);
+   List path = new ArrayList<>(prefix);
+   path.addAll(userPath);
+
+   Optional potentialTable = 
lookupCatalogTable(path);
 
if (!potentialTable.isPresent()) {
potentialTable = lookupExternalTable(path);
}
-   return potentialTable.map(schema -> new 
CatalogTableOperation(path, schema));
+   return potentialTable;
} catch (TableNotExistException e) {
return Optional.empty();
}
}
 
-   private Optional lookupCatalogTable(List path) 
throws TableNotExistException {
-   if (path.size() >= 3) {
+   private Optional lookupCatalogTable(List 
path) throws TableNotExistException {
 
 Review comment:
   Good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284828486
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -250,56 +258,64 @@ public void setCurrentDatabase(String databaseName) 
throws DatabaseNotExistExcep
public Optional resolveTable(String... 
tablePath) {
checkArgument(tablePath != null && tablePath.length != 0, 
"Table path must not be null or empty.");
 
-   List defaultPath = new ArrayList<>();
-   defaultPath.add(currentCatalogName);
-   defaultPath.add(currentDatabaseName);
+   List userPath = asList(tablePath);
 
-   List userPath = Arrays.asList(tablePath);
-   defaultPath.addAll(userPath);
+   List> prefixes = asList(
+   asList(currentCatalogName, currentDatabaseName),
+   singletonList(currentCatalogName),
+   emptyList()
+   );
 
-   Optional inDefaultPath = 
lookupPath(defaultPath);
-
-   if (inDefaultPath.isPresent()) {
-   return inDefaultPath;
-   } else {
-   return lookupPath(userPath);
+   for (List prefix : prefixes) {
+   Optional potentialTable = 
lookupPath(prefix, userPath);
+   if (potentialTable.isPresent()) {
+   return potentialTable;
+   }
}
+
+   return Optional.empty();
}
 
-   private Optional lookupPath(List path) {
+   private Optional lookupPath(List prefix, 
List userPath) {
try {
-   Optional potentialTable = 
lookupCatalogTable(path);
+   List path = new ArrayList<>(prefix);
+   path.addAll(userPath);
+
+   Optional potentialTable = 
lookupCatalogTable(path);
 
if (!potentialTable.isPresent()) {
potentialTable = lookupExternalTable(path);
}
-   return potentialTable.map(schema -> new 
CatalogTableOperation(path, schema));
+   return potentialTable;
} catch (TableNotExistException e) {
return Optional.empty();
}
}
 
-   private Optional lookupCatalogTable(List path) 
throws TableNotExistException {
-   if (path.size() >= 3) {
+   private Optional lookupCatalogTable(List 
path) throws TableNotExistException {
+   if (path.size() == 3) {
Catalog currentCatalog = catalogs.get(path.get(0));
String currentDatabaseName = path.get(1);
String tableName = String.join(".", path.subList(2, 
path.size()));
 
 Review comment:
   Good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284825295
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -250,56 +258,64 @@ public void setCurrentDatabase(String databaseName) 
throws DatabaseNotExistExcep
public Optional resolveTable(String... 
tablePath) {
checkArgument(tablePath != null && tablePath.length != 0, 
"Table path must not be null or empty.");
 
-   List defaultPath = new ArrayList<>();
-   defaultPath.add(currentCatalogName);
-   defaultPath.add(currentDatabaseName);
+   List userPath = asList(tablePath);
 
-   List userPath = Arrays.asList(tablePath);
-   defaultPath.addAll(userPath);
+   List> prefixes = asList(
+   asList(currentCatalogName, currentDatabaseName),
+   singletonList(currentCatalogName),
+   emptyList()
+   );
 
-   Optional inDefaultPath = 
lookupPath(defaultPath);
-
-   if (inDefaultPath.isPresent()) {
-   return inDefaultPath;
-   } else {
-   return lookupPath(userPath);
+   for (List prefix : prefixes) {
+   Optional potentialTable = 
lookupPath(prefix, userPath);
+   if (potentialTable.isPresent()) {
+   return potentialTable;
+   }
}
+
+   return Optional.empty();
}
 
-   private Optional lookupPath(List path) {
+   private Optional lookupPath(List prefix, 
List userPath) {
try {
-   Optional potentialTable = 
lookupCatalogTable(path);
+   List path = new ArrayList<>(prefix);
+   path.addAll(userPath);
+
+   Optional potentialTable = 
lookupCatalogTable(path);
 
 Review comment:
   External catalogs support arbitrary nesting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284825326
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -250,56 +258,64 @@ public void setCurrentDatabase(String databaseName) 
throws DatabaseNotExistExcep
public Optional resolveTable(String... 
tablePath) {
checkArgument(tablePath != null && tablePath.length != 0, 
"Table path must not be null or empty.");
 
-   List defaultPath = new ArrayList<>();
-   defaultPath.add(currentCatalogName);
-   defaultPath.add(currentDatabaseName);
+   List userPath = asList(tablePath);
 
-   List userPath = Arrays.asList(tablePath);
-   defaultPath.addAll(userPath);
+   List> prefixes = asList(
+   asList(currentCatalogName, currentDatabaseName),
+   singletonList(currentCatalogName),
+   emptyList()
+   );
 
-   Optional inDefaultPath = 
lookupPath(defaultPath);
-
-   if (inDefaultPath.isPresent()) {
-   return inDefaultPath;
-   } else {
-   return lookupPath(userPath);
+   for (List prefix : prefixes) {
+   Optional potentialTable = 
lookupPath(prefix, userPath);
+   if (potentialTable.isPresent()) {
+   return potentialTable;
+   }
}
+
+   return Optional.empty();
}
 
-   private Optional lookupPath(List path) {
+   private Optional lookupPath(List prefix, 
List userPath) {
try {
-   Optional potentialTable = 
lookupCatalogTable(path);
+   List path = new ArrayList<>(prefix);
+   path.addAll(userPath);
+
+   Optional potentialTable = 
lookupCatalogTable(path);
 
 Review comment:
   External catalogs support arbitrary nesting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284825295
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -250,56 +258,64 @@ public void setCurrentDatabase(String databaseName) 
throws DatabaseNotExistExcep
public Optional resolveTable(String... 
tablePath) {
checkArgument(tablePath != null && tablePath.length != 0, 
"Table path must not be null or empty.");
 
-   List defaultPath = new ArrayList<>();
-   defaultPath.add(currentCatalogName);
-   defaultPath.add(currentDatabaseName);
+   List userPath = asList(tablePath);
 
-   List userPath = Arrays.asList(tablePath);
-   defaultPath.addAll(userPath);
+   List> prefixes = asList(
+   asList(currentCatalogName, currentDatabaseName),
+   singletonList(currentCatalogName),
+   emptyList()
+   );
 
-   Optional inDefaultPath = 
lookupPath(defaultPath);
-
-   if (inDefaultPath.isPresent()) {
-   return inDefaultPath;
-   } else {
-   return lookupPath(userPath);
+   for (List prefix : prefixes) {
+   Optional potentialTable = 
lookupPath(prefix, userPath);
+   if (potentialTable.isPresent()) {
+   return potentialTable;
+   }
}
+
+   return Optional.empty();
}
 
-   private Optional lookupPath(List path) {
+   private Optional lookupPath(List prefix, 
List userPath) {
try {
-   Optional potentialTable = 
lookupCatalogTable(path);
+   List path = new ArrayList<>(prefix);
+   path.addAll(userPath);
+
+   Optional potentialTable = 
lookupCatalogTable(path);
 
 Review comment:
   External catalogs support arbitrary nesting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284825326
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -250,56 +258,64 @@ public void setCurrentDatabase(String databaseName) 
throws DatabaseNotExistExcep
public Optional resolveTable(String... 
tablePath) {
checkArgument(tablePath != null && tablePath.length != 0, 
"Table path must not be null or empty.");
 
-   List defaultPath = new ArrayList<>();
-   defaultPath.add(currentCatalogName);
-   defaultPath.add(currentDatabaseName);
+   List userPath = asList(tablePath);
 
-   List userPath = Arrays.asList(tablePath);
-   defaultPath.addAll(userPath);
+   List> prefixes = asList(
+   asList(currentCatalogName, currentDatabaseName),
+   singletonList(currentCatalogName),
+   emptyList()
+   );
 
-   Optional inDefaultPath = 
lookupPath(defaultPath);
-
-   if (inDefaultPath.isPresent()) {
-   return inDefaultPath;
-   } else {
-   return lookupPath(userPath);
+   for (List prefix : prefixes) {
+   Optional potentialTable = 
lookupPath(prefix, userPath);
+   if (potentialTable.isPresent()) {
+   return potentialTable;
+   }
}
+
+   return Optional.empty();
}
 
-   private Optional lookupPath(List path) {
+   private Optional lookupPath(List prefix, 
List userPath) {
try {
-   Optional potentialTable = 
lookupCatalogTable(path);
+   List path = new ArrayList<>(prefix);
+   path.addAll(userPath);
+
+   Optional potentialTable = 
lookupCatalogTable(path);
 
 Review comment:
   External catalogs support arbitrary nesting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284825268
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -250,56 +258,64 @@ public void setCurrentDatabase(String databaseName) 
throws DatabaseNotExistExcep
public Optional resolveTable(String... 
tablePath) {
checkArgument(tablePath != null && tablePath.length != 0, 
"Table path must not be null or empty.");
 
-   List defaultPath = new ArrayList<>();
-   defaultPath.add(currentCatalogName);
-   defaultPath.add(currentDatabaseName);
+   List userPath = asList(tablePath);
 
-   List userPath = Arrays.asList(tablePath);
-   defaultPath.addAll(userPath);
+   List> prefixes = asList(
+   asList(currentCatalogName, currentDatabaseName),
+   singletonList(currentCatalogName),
+   emptyList()
+   );
 
-   Optional inDefaultPath = 
lookupPath(defaultPath);
-
-   if (inDefaultPath.isPresent()) {
-   return inDefaultPath;
-   } else {
-   return lookupPath(userPath);
+   for (List prefix : prefixes) {
+   Optional potentialTable = 
lookupPath(prefix, userPath);
+   if (potentialTable.isPresent()) {
+   return potentialTable;
+   }
}
+
+   return Optional.empty();
}
 
-   private Optional lookupPath(List path) {
+   private Optional lookupPath(List prefix, 
List userPath) {
try {
-   Optional potentialTable = 
lookupCatalogTable(path);
+   List path = new ArrayList<>(prefix);
+   path.addAll(userPath);
+
+   Optional potentialTable = 
lookupCatalogTable(path);
 
 Review comment:
   External catalogs support arbitrary nesting.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284671140
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager that encapsulates all available catalogs. It also 
implements the logic of
+ * table path resolution. Supports both new API ({@link Catalog} as well as 
{@link ExternalCatalog}).
+ */
+@Internal
+public class CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // TO BE REMOVED along with ExternalCatalog API
+   private Map  externalCatalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   private String currentDatabaseName;
+
+   public CatalogManager(String defaultCatalogName, Catalog 
defaultCatalog) {
+   checkArgument(
+   !StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
+   "Default catalog name cannot be null or empty");
+   checkNotNull(defaultCatalog, "Default catalog cannot be null");
+   catalogs = new LinkedHashMap<>();
+   externalCatalogs = new LinkedHashMap<>();
+   catalogs.put(defaultCatalogName, defaultCatalog);
+   this.currentCatalogName = defaultCatalogName;
+   this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
+   }
+
+   /**
+* Registers a catalog under the given name. The catalog name must be 
unique across both
+* {@link Catalog}s and {@link ExternalCatalog}s.
+*
+* @param catalogName name under which to register the given catalog
+* @param catalog catalog to register
+* @throws CatalogException if the registration of the catalog under 
the given name failed
+*/
+   public void registerCatalog(String catalogName, Catalog catalog) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"Catalog name cannot be null or empty.");
+   checkNotNull(catalog, "Catalog cannot be null");
+
+   if (catalogs.containsKey(catalogName) || 
externalCatalogs.containsKey(catalogName)) {
+   throw new CatalogException(format("Catalog %s already 
exists.", catalogName));
+   }
+
+   catalogs.put(catalogName, catalog);
+   catalog.open();
+   }
+
+   /**
+* Gets a catalog by name.
+*
+* @param catalogName name of the catalog to retrieve
+* @return the requested catalog or empty if it does not exist
+* @see CatalogManager#getExternalCatalog(String)
+*/
+   public Optional getCatalog(String catalogName) {
+   return Optional.ofNullable(catalogs.get(catalogName));
+   }
+
+   /**
+* Registers an external catalog under the given name. The catalog name 
must be unique across both
+* 

[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284582495
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -280,6 +317,54 @@
 */
void sqlUpdate(String stmt, QueryConfig config);
 
+   /**
+* Gets the current default catalog name of the current session.
+*
+* @return The current default catalog name that is used for the path 
resolution.
+* @see TableEnvironment#useCatalog(String)
+*/
+   String getCurrentCatalog();
+
+   /**
+* Sets the current catalog to the given value. It also sets the default
+* database to the catalog's default one. To assign both catalog and 
database explicitly
+* see {@link TableEnvironment#useDatabase(String, String)}.
+*
+* This is used during the resolution of object paths. The default 
path is constructed as
+* {@code [current-catalog].[current-database]}. During the resolution, 
first we try to look for
+* {@code [default-path].[object-path]} if no object is found we assume 
the object path is a fully
+* qualified one and we look under {@code [object-path]}.
+*
+* @param catalogName The name of the catalog to set as the current 
default catalog.
+* @throws CatalogException thrown if a catalog with given name could 
not be set as the default one
+*/
+   void useCatalog(String catalogName);
 
 Review comment:
   This is against majority (if not all) code style guidelines. The `throws` 
cause should declare only those exception that we expect the user to handle.
   
   BTW, me & @twalthr had a chat few times before and we fill we should revisit 
the exception structure in the catalog API anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284554790
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager that encapsulates all available catalogs. It also 
implements the logic of
+ * table path resolution. Supports both new API ({@link Catalog} as well as 
{@link ExternalCatalog}).
+ */
+@Internal
+public class CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // TO BE REMOVED along with ExternalCatalog API
+   private Map  externalCatalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   private String currentDatabaseName;
+
+   public CatalogManager(String defaultCatalogName, Catalog 
defaultCatalog) {
+   checkArgument(
+   !StringUtils.isNullOrWhitespaceOnly(defaultCatalogName),
+   "Default catalog name cannot be null or empty");
+   checkNotNull(defaultCatalog, "Default catalog cannot be null");
+   catalogs = new LinkedHashMap<>();
+   externalCatalogs = new LinkedHashMap<>();
+   catalogs.put(defaultCatalogName, defaultCatalog);
+   this.currentCatalogName = defaultCatalogName;
+   this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
+   }
+
+   /**
+* Registers a catalog under the given name. The catalog name must be 
unique across both
+* {@link Catalog}s and {@link ExternalCatalog}s.
+*
+* @param catalogName name under which to register the given catalog
+* @param catalog catalog to register
+* @throws CatalogException if the registration of the catalog under 
the given name failed
+*/
+   public void registerCatalog(String catalogName, Catalog catalog) {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"Catalog name cannot be null or empty.");
+   checkNotNull(catalog, "Catalog cannot be null");
+
+   if (catalogs.containsKey(catalogName) || 
externalCatalogs.containsKey(catalogName)) {
+   throw new CatalogException(format("Catalog %s already 
exists.", catalogName));
+   }
+
+   catalogs.put(catalogName, catalog);
+   catalog.open();
+   }
+
+   /**
+* Gets a catalog by name.
+*
+* @param catalogName name of the catalog to retrieve
+* @return the requested catalog or empty if it does not exist
+* @see CatalogManager#getExternalCatalog(String)
+*/
+   public Optional getCatalog(String catalogName) {
+   return Optional.ofNullable(catalogs.get(catalogName));
+   }
+
+   /**
+* Registers an external catalog under the given name. The catalog name 
must be unique across both
+* 

[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-16 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284554711
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 ##
 @@ -280,6 +317,54 @@
 */
void sqlUpdate(String stmt, QueryConfig config);
 
+   /**
+* Gets the current default catalog name of the current session.
+*
+* @return The current default catalog name that is used for the path 
resolution.
+* @see TableEnvironment#useCatalog(String)
+*/
+   String getCurrentCatalog();
+
+   /**
+* Sets the current catalog to the given value. It also sets the default
+* database to the catalog's default one. To assign both catalog and 
database explicitly
+* see {@link TableEnvironment#useDatabase(String, String)}.
+*
+* This is used during the resolution of object paths. The default 
path is constructed as
+* {@code [current-catalog].[current-database]}. During the resolution, 
first we try to look for
+* {@code [default-path].[object-path]} if no object is found we assume 
the object path is a fully
+* qualified one and we look under {@code [object-path]}.
+*
+* @param catalogName The name of the catalog to set as the current 
default catalog.
+* @throws CatalogException thrown if a catalog with given name could 
not be set as the default one
 
 Review comment:
   Connection, Authorization ... 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-15 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284103981
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/TableEnvImpl.scala
 ##
 @@ -69,45 +66,19 @@ import _root_.scala.collection.mutable
   */
 abstract class TableEnvImpl(val config: TableConfig) extends TableEnvironment {
 
-  // the catalog to hold all registered and translated tables
-  // we disable caching here to prevent side effects
-  private val internalSchema: CalciteSchema = 
CalciteSchema.createRootSchema(false, false)
-  private val rootSchema: SchemaPlus = internalSchema.plus()
-
   // Table API/SQL function catalog
   private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog()
 
-  // the configuration to create a Calcite planner
-  private lazy val frameworkConfig: FrameworkConfig = Frameworks
-.newConfigBuilder
-.defaultSchema(rootSchema)
-.parserConfig(getSqlParserConfig)
-.costFactory(new DataSetCostFactory)
-.typeSystem(new FlinkTypeSystem)
-.operatorTable(getSqlOperatorTable)
-.sqlToRelConverterConfig(getSqlToRelConverterConfig)
-// the converter is needed when calling temporal table functions from SQL, 
because
-// they reference a history table represented with a tree of table 
operations
-.context(Contexts.of(
-  new TableOperationConverter.ToRelConverterSupplier(expressionBridge)
-))
-// set the executor to evaluate constant expressions
-.executor(new ExpressionReducer(config))
-.build
+  private val BUILTIN_CATALOG_NAME = "builtin"
 
 Review comment:
   Does `CatalogManager` needs to know about it? This is just a initial value 
for the current catalog, so I think it fits better to the `TableEnvironment` 
rather than `CatalogManager`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-15 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r284103693
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink catalog's database and Calcite's schema.
+ * Tables are registered as tables in the schema.
+ */
+class DatabaseCalciteSchema implements Schema {
+   private final String dbName;
+   private final Catalog catalog;
+
+   public DatabaseCalciteSchema(String dbName, Catalog catalog) {
+   this.dbName = dbName;
+   this.catalog = catalog;
+   }
+
+   @Override
+   public Table getTable(String tableName) {
+
+   ObjectPath tablePath = new ObjectPath(dbName, tableName);
+
+   try {
+   if (!catalog.tableExists(tablePath)) {
+   return null;
+   }
+
+   CatalogBaseTable table = catalog.getTable(tablePath);
+
+   if (table instanceof CalciteCatalogTable) {
+   return ((CalciteCatalogTable) table).getTable();
+   } else {
+   throw new TableException("Unsupported table 
type: " + table);
 
 Review comment:
   For the `CatalogTable` we need to agree how do we retrieve/create 
`TableSourceFactory`, shall we get it from the catalog or via service discovery.
   
   For a CatalogView we can add it already, but in this PR I wanted to focus on 
just the existing functionalities.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-15 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283916005
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
 ##
 @@ -118,8 +118,12 @@ abstract class TableTestUtil {
 }
 else if (expectedLine == TableTestUtil.ANY_SUBTREE) {
   break
-}
-else if (expectedLine != actualLine) {
+} else if (!verifyCatalogPath && actualLine.contains("table=[[")) {
 
 Review comment:
   I did not want to change all of the tests. That's why I added an option to 
strip the default catalog & database during validation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-15 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283916005
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
 ##
 @@ -118,8 +118,12 @@ abstract class TableTestUtil {
 }
 else if (expectedLine == TableTestUtil.ANY_SUBTREE) {
   break
-}
-else if (expectedLine != actualLine) {
+} else if (!verifyCatalogPath && actualLine.contains("table=[[")) {
 
 Review comment:
   I did not want to change all of the tests. That's why I added an option to 
strip the default catalog & database during the validation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283916005
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
 ##
 @@ -118,8 +118,12 @@ abstract class TableTestUtil {
 }
 else if (expectedLine == TableTestUtil.ANY_SUBTREE) {
   break
-}
-else if (expectedLine != actualLine) {
+} else if (!verifyCatalogPath && actualLine.contains("table=[[")) {
 
 Review comment:
   I did not want change all of the tests. That's why I added an option to 
strip the default catalog & database during validation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283915654
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink catalog's database and Calcite's schema.
+ * Tables are registered as tables in the schema.
+ */
+class DatabaseCalciteSchema implements Schema {
+   private final String dbName;
+   private final Catalog catalog;
+
+   public DatabaseCalciteSchema(String dbName, Catalog catalog) {
+   this.dbName = dbName;
+   this.catalog = catalog;
+   }
+
+   @Override
+   public Table getTable(String tableName) {
+
+   ObjectPath tablePath = new ObjectPath(dbName, tableName);
+
+   try {
+   if (!catalog.tableExists(tablePath)) {
+   return null;
+   }
+
+   CatalogBaseTable table = catalog.getTable(tablePath);
+
+   if (table instanceof CalciteCatalogTable) {
+   return ((CalciteCatalogTable) table).getTable();
+   } else {
+   throw new TableException("Unsupported table 
type: " + table);
+   }
+   } catch (Exception e) {
+   throw new TableException("Could not find table: " + 
tableName, e);
 
 Review comment:
   Actually we should never end up here with `TableNotExistException` as we are 
checking `catalog.tableExist()` before. That's why I went with `TableExceptin` 
here.
   
   I added appropriate comment there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283851812
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##
 @@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogAlreadyExistsException;
+import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.ExternalCatalogAlreadyExistException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.CatalogTableOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A CatalogManager that encapsulates all available catalogs. It also 
implements the logic of
+ * table path resolution. Supports both new API ({@link ReadableCatalog} as 
well as {@link ExternalCatalog}.
+ */
+@Internal
+public class CatalogManager {
+   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
+
+   // A map between names and catalogs.
+   private Map catalogs;
+
+   // TO BE REMOVED along with ExternalCatalog API
+   private Map  externalCatalogs;
+
+   // The name of the default catalog and schema
+   private String currentCatalogName;
+
+   private String currentDatabaseName;
+
+   public CatalogManager(String defaultCatalogName, Catalog 
defaultCatalog) {
+   catalogs = new LinkedHashMap<>();
+   externalCatalogs = new LinkedHashMap<>();
+   catalogs.put(defaultCatalogName, defaultCatalog);
+   this.currentCatalogName = defaultCatalogName;
+   this.currentDatabaseName = defaultCatalog.getCurrentDatabase();
+   }
+
+   /**
+* Registers a catalog under the given name. The catalog name must be 
unique across both
+* {@link Catalog}s and {@link ExternalCatalog}s.
+*
+* @param catalogName name under which to register the given catalog
+* @param catalog catalog to register
+* @throws CatalogAlreadyExistsException thrown if the name is already 
taken
+*/
+   public void registerCatalog(String catalogName, Catalog catalog) throws 
CatalogAlreadyExistsException {
+   checkArgument(!StringUtils.isNullOrWhitespaceOnly(catalogName), 
"Catalog name cannot be null or empty.");
+   checkNotNull(catalog, "Catalog cannot be null");
+
+   if (catalogs.containsKey(catalogName) || 
externalCatalogs.containsKey(catalogName)) {
+   throw new CatalogAlreadyExistsException(catalogName);
+   }
+
+   catalogs.put(catalogName, catalog);
+   catalog.open();
+   }
+
+   /**
+* Gets a catalog by name.
+*
+* @param catalogName name of the catalog to retrieve
+* @return the requested catalog
+* @throws CatalogNotExistException thrown if the catalog doesn't exist
+* @see CatalogManager#getExternalCatalog(String)
+*/
+   public Catalog getCatalog(String catalogName) throws 
CatalogNotExistException {
+   if (!catalogs.keySet().contains(catalogName)) {
+   throw new CatalogNotExistException(catalogName);
+   }
+
+   return catalogs.get(catalogName);
+   }
+
+   /**
+* Registers an external catalog under the given 

[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283845430
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/ExternalTableUtil.scala
 ##
 @@ -40,63 +40,67 @@ object ExternalTableUtil extends Logging {
 * @param externalTable the [[ExternalCatalogTable]] instance which to 
convert
 * @return converted [[TableSourceTable]] instance from the input catalog 
table
 */
-  def fromExternalCatalogTable[T1, T2](
-  tableEnv: TableEnvironment,
-  externalTable: ExternalCatalogTable)
+  def fromExternalCatalogTable[T1, T2](isBatch: Boolean, externalTable: 
ExternalCatalogTable)
 : TableSourceSinkTable[T1, T2] = {
 
 val statistics = new FlinkStatistic(toScala(externalTable.getTableStats))
 
 val source: Option[TableSourceTable[T1]] = if 
(externalTable.isTableSource) {
-  Some(createTableSource(tableEnv, externalTable, statistics))
+  Some(createTableSource(isBatch, externalTable, statistics))
 } else {
   None
 }
 
 val sink: Option[TableSinkTable[T2]] = if (externalTable.isTableSink) {
-  Some(createTableSink(tableEnv, externalTable, statistics))
+  Some(createTableSink(isBatch, externalTable, statistics))
 } else {
   None
 }
 
 new TableSourceSinkTable[T1, T2](source, sink)
   }
 
+  def getTableSchema(externalTable: ExternalCatalogTable) : TableSchema = {
+if (externalTable.isTableSource) {
+  
TableFactoryUtil.findAndCreateTableSource[Any](externalTable).getTableSchema
+} else {
+  val tableSink = TableFactoryUtil.findAndCreateTableSink(externalTable)
+  new TableSchema(tableSink.getFieldNames, tableSink.getFieldTypes)
+}
+  }
+
   private def createTableSource[T](
-  tableEnv: TableEnvironment,
+  isBatch: Boolean,
   externalTable: ExternalCatalogTable,
   statistics: FlinkStatistic)
-: TableSourceTable[T] = tableEnv match {
-
-case _: BatchTableEnvImpl if externalTable.isBatchTable =>
+: TableSourceTable[T] = {
+if (isBatch && externalTable.isBatchTable) {
   val source = TableFactoryUtil.findAndCreateTableSource(externalTable)
   new BatchTableSourceTable[T](source.asInstanceOf[BatchTableSource[T]], 
statistics)
-
-case _: StreamTableEnvImpl if externalTable.isStreamTable =>
+} else if (!isBatch && externalTable.isStreamTable) {
   val source = TableFactoryUtil.findAndCreateTableSource(externalTable)
   new StreamTableSourceTable[T](source.asInstanceOf[StreamTableSource[T]], 
statistics)
-
-case _ =>
+} else {
   throw new ValidationException(
 "External catalog table does not support the current environment for a 
table source.")
 
 Review comment:
   Yes, but I think this message applies to both of those. BTW the logic has 
not changed, just the type of the flag has changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283841499
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerSchema.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.CatalogNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Table;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Bridge between {@link CatalogManager} and {@link Schema}. This way we can 
query Flink's specific catalogs
+ * from Calcite.
+ *
+ * The mapping for {@link Catalog}s is modeled as a strict two-level 
reference structure for Flink in Calcite,
+ * the full path of tables and views is of format 
[catalog_name].[db_name].[meta-object_name].
+ *
+ * It also supports {@link ExternalCatalog}s. An external catalog maps 1:1 
to Calcite's schema.
+ */
+@Internal
+public class CatalogManagerSchema implements Schema {
+
+   private final CatalogManager catalogManager;
+   private Boolean isBatch;
 
 Review comment:
   Sorry about that. :( That's how working in Scala & Java at the same time 
ends.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283839291
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.Table;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Thin wrapper around Calcite specific {@link Table}, this is a temporary 
solution
+ * that allows to register those tables in the {@link CatalogManager}.
+ * TODO remove once we decouple TableEnvironment from Calcite.
+ */
+@Internal
+public class CalciteCatalogTable implements CatalogBaseTable {
+   private final Table table;
+   private final FlinkTypeFactory typeFactory;
+
+   public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) {
+   this.table = table;
+   this.typeFactory = typeFactory;
+   }
+
+   public Table getTable() {
+   return table;
+   }
+
+   @Override
+   public Map getProperties() {
+   throw new UnsupportedOperationException("Calcite table cannot 
be expressed as a map of properties.");
 
 Review comment:
   I thought this method represents a serialized (toProperties) representation 
of the `Table`, but apparently I was wrong. I will fix this, as I was explained 
those are just additional properties.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283839291
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.Table;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Thin wrapper around Calcite specific {@link Table}, this is a temporary 
solution
+ * that allows to register those tables in the {@link CatalogManager}.
+ * TODO remove once we decouple TableEnvironment from Calcite.
+ */
+@Internal
+public class CalciteCatalogTable implements CatalogBaseTable {
+   private final Table table;
+   private final FlinkTypeFactory typeFactory;
+
+   public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) {
+   this.table = table;
+   this.typeFactory = typeFactory;
+   }
+
+   public Table getTable() {
+   return table;
+   }
+
+   @Override
+   public Map getProperties() {
+   throw new UnsupportedOperationException("Calcite table cannot 
be expressed as a map of properties.");
 
 Review comment:
   I thought this method represents a serialized (toProperties) representation 
of the `Table`, but apparently I was wrong. I will fix this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283823660
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A mapping between Flink's catalog and Calcite's schema. This enables to 
look up and access tables
+ * in SQL queries without registering tables in advance. Databases are 
registered as sub-schemas in the schema.
+ */
+@Internal
+public class CatalogCalciteSchema implements Schema {
+   private static final Logger LOGGER = 
LoggerFactory.getLogger(CatalogCalciteSchema.class);
+
+   private final String catalogName;
+   private final Catalog catalog;
+
+   public CatalogCalciteSchema(String catalogName, Catalog catalog) {
+   this.catalogName = catalogName;
+   this.catalog = catalog;
+   }
+
+   /**
+* Look up a sub-schema (database) by the given sub-schema name.
+*
+* @param schemaName name of sub-schema to look up
+* @return the sub-schema with a given dbName, or null
+*/
+   @Override
+   public Schema getSubSchema(String schemaName) {
+
+   if (catalog.databaseExists(schemaName)) {
+   return new DatabaseCalciteSchema(schemaName, catalog);
+   } else {
+   LOGGER.error(String.format("Schema %s does not exist in 
catalog %s", schemaName, catalogName));
+   throw new CatalogException(new 
DatabaseNotExistException(catalogName, schemaName));
+   }
+   }
+
+   @Override
+   public Set getSubSchemaNames() {
+   return new HashSet<>(catalog.listDatabases());
+   }
+
+   @Override
+   public Table getTable(String name) {
+   return null;
+   }
+
+   @Override
+   public Set getTableNames() {
+   return new HashSet<>();
+   }
+
+   @Override
+   public RelProtoDataType getType(String name) {
+   return null;
 
 Review comment:
   AFAIK we do not support user defined types to be stored in Catalog, do we? 
That's why I think it is desired to return null here until we do.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8404: [FLINK-11476][table] Create CatalogManager to manage multiple catalogs

2019-05-14 Thread GitBox
dawidwys commented on a change in pull request #8404: [FLINK-11476][table] 
Create CatalogManager to manage multiple catalogs
URL: https://github.com/apache/flink/pull/8404#discussion_r283818770
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CalciteCatalogTable.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.schema.Table;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Thin wrapper around Calcite specific {@link Table}, this is a temporary 
solution
+ * that allows to register those tables in the {@link CatalogManager}.
+ * TODO remove once we decouple TableEnvironment from Calcite.
+ */
+@Internal
+public class CalciteCatalogTable implements CatalogBaseTable {
+   private final Table table;
+   private final FlinkTypeFactory typeFactory;
+
+   public CalciteCatalogTable(Table table, FlinkTypeFactory typeFactory) {
+   this.table = table;
+   this.typeFactory = typeFactory;
+   }
+
+   public Table getTable() {
+   return table;
+   }
+
+   @Override
+   public Map getProperties() {
+   throw new UnsupportedOperationException("Calcite table cannot 
be expressed as a map of properties.");
+   }
+
+   @Override
+   public TableSchema getSchema() {
+   RelDataType relDataType = table.getRowType(typeFactory);
+
+   String[] fieldNames = relDataType.getFieldNames().toArray(new 
String[0]);
+   TypeInformation[] fieldTypes = relDataType.getFieldList()
+   .stream()
+   .map(field -> 
FlinkTypeFactory.toTypeInfo(field.getType())).toArray(TypeInformation[]::new);
+
+   return new TableSchema(fieldNames, fieldTypes);
+   }
+
+   @Override
+   public String getComment() {
+   return null;
+   }
+
+   @Override
+   public CatalogBaseTable copy() {
+   return this;
 
 Review comment:
   I agree this would be better. Unfortunately this makes it impossible to 
register the table in the catalog as it creates a copy on either `createTable` 
or `getTable`.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services