This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 243fb4d  [FLINK-13296][table] FunctionCatalog.lookupFunction() should 
check in memory functions if the target function doesn't exist in catalog
243fb4d is described below

commit 243fb4d1237b86db8f9245954a8150236360034f
Author: bowen.li <bowenl...@gmail.com>
AuthorDate: Tue Jul 16 11:30:31 2019 -0700

    [FLINK-13296][table] FunctionCatalog.lookupFunction() should check in 
memory functions if the target function doesn't exist in catalog
    
    Currently the logic to lookup a function is check either the catalog or the 
in memory function. But the correct logic is to 1st check the catalog, and if 
the function doesn't exist there, check in memory functions. There should be a 
resolution order.
    
    This closes #9135.
---
 .../client/gateway/local/LocalExecutorITCase.java  |  2 -
 .../flink/table/catalog/FunctionCatalog.java       | 65 +++++++++++-----------
 2 files changed, 34 insertions(+), 33 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 18d794c..ac1a7ae 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -49,7 +49,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
@@ -430,7 +429,6 @@ public class LocalExecutorITCase extends TestLogger {
                }
        }
 
-       @Ignore
        @Test
        public void testUseCatalogAndUseDatabase() throws Exception {
                final String csvOutputPath = new 
File(tempFolder.newFolder().getAbsolutePath(), 
"test-out.csv").toURI().toString();
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index af7f21f..01639b8 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -159,56 +159,59 @@ public class FunctionCatalog implements FunctionLookup {
        public Optional<FunctionLookup.Result> lookupFunction(String name) {
                String functionName = normalizeName(name);
 
-               FunctionDefinition userCandidate = null;
+               FunctionDefinition userCandidate;
 
                Catalog catalog = 
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).get();
 
-               if (catalog.getTableFactory().isPresent() &&
+               try {
+                       CatalogFunction catalogFunction = catalog.getFunction(
+                               new 
ObjectPath(catalogManager.getCurrentDatabase(), functionName));
+
+                       if (catalog.getTableFactory().isPresent() &&
                                catalog.getTableFactory().get() instanceof 
FunctionDefinitionFactory) {
-                       try {
-                               CatalogFunction catalogFunction = 
catalog.getFunction(
-                                       new 
ObjectPath(catalogManager.getCurrentDatabase(), functionName));
 
                                FunctionDefinitionFactory factory = 
(FunctionDefinitionFactory) catalog.getTableFactory().get();
 
                                userCandidate = 
factory.createFunctionDefinition(functionName, catalogFunction);
-                       } catch (FunctionNotExistException e) {
-                               // Ignore
-                       }
 
-                       return Optional.of(
+                               return Optional.of(
                                        new FunctionLookup.Result(
                                                
ObjectIdentifier.of(catalogManager.getCurrentCatalog(), 
catalogManager.getCurrentDatabase(), name),
                                                userCandidate)
                                );
-               } else {
-                       // Else, check in-memory functions
-                       userCandidate = userFunctions.get(functionName);
-
-                       final Optional<FunctionDefinition> foundDefinition;
-                       if (userCandidate != null) {
-                               foundDefinition = Optional.of(userCandidate);
                        } else {
+                               // TODO: should go thru function definition 
discover service
+                       }
+               } catch (FunctionNotExistException e) {
+                       // Ignore
+               }
 
-                               // TODO once we connect this class with the 
Catalog APIs we need to make sure that
-                               //  built-in functions are present in "root" 
built-in catalog. This allows to
-                               //  overwrite built-in functions but also 
fallback to the "root" catalog. It should be
-                               //  possible to disable the "root" catalog if 
that is desired.
+               // If no corresponding function is found in catalog, check 
in-memory functions
+               userCandidate = userFunctions.get(functionName);
 
-                               foundDefinition = 
BuiltInFunctionDefinitions.getDefinitions()
-                                       .stream()
-                                       .filter(f -> 
functionName.equals(normalizeName(f.getName())))
-                                       .findFirst()
-                                       .map(Function.identity());
-                       }
+               final Optional<FunctionDefinition> foundDefinition;
+               if (userCandidate != null) {
+                       foundDefinition = Optional.of(userCandidate);
+               } else {
 
-                       String defaultCatalogName = 
catalogManager.getDefaultCatalogName();
+                       // TODO once we connect this class with the Catalog 
APIs we need to make sure that
+                       //  built-in functions are present in "root" built-in 
catalog. This allows to
+                       //  overwrite built-in functions but also fallback to 
the "root" catalog. It should be
+                       //  possible to disable the "root" catalog if that is 
desired.
 
-                       return foundDefinition.map(definition -> new 
FunctionLookup.Result(
-                               ObjectIdentifier.of(defaultCatalogName, 
catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
-                               definition)
-                       );
+                       foundDefinition = 
BuiltInFunctionDefinitions.getDefinitions()
+                               .stream()
+                               .filter(f -> 
functionName.equals(normalizeName(f.getName())))
+                               .findFirst()
+                               .map(Function.identity());
                }
+
+               String defaultCatalogName = 
catalogManager.getDefaultCatalogName();
+
+               return foundDefinition.map(definition -> new 
FunctionLookup.Result(
+                       ObjectIdentifier.of(defaultCatalogName, 
catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
+                       definition)
+               );
        }
 
        @Override

Reply via email to