This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new f11cdd3  [FLINK-13653][sql-client] ResultStore should avoid using 
RowTypeInfo when creating a result
f11cdd3 is described below

commit f11cdd3bed438d46583368284afce74bc4ecd703
Author: Rui Li <li...@apache.org>
AuthorDate: Tue Aug 13 20:57:18 2019 +0800

    [FLINK-13653][sql-client] ResultStore should avoid using RowTypeInfo when 
creating a result
    
    Fix the issue that types with parameters, e.g. decimal, cannot be accessed 
via SQL client.
    
    This closes #9432.
---
 .../gateway/local/CollectBatchTableSink.java       | 28 +++++---------
 .../gateway/local/CollectStreamTableSink.java      | 24 ++++--------
 .../table/client/gateway/local/ResultStore.java    |  5 ++-
 .../local/result/ChangelogCollectStreamResult.java |  5 ++-
 .../gateway/local/result/CollectStreamResult.java  |  6 +--
 .../result/MaterializedCollectBatchResult.java     |  6 +--
 .../result/MaterializedCollectStreamResult.java    |  6 ++-
 .../table/client/gateway/local/DependencyTest.java | 13 +++++++
 .../client/gateway/local/LocalExecutorITCase.java  | 43 ++++++++++++++++++++--
 .../MaterializedCollectStreamResultTest.java       | 13 +++++++
 .../src/test/resources/test-data-1.csv             | 18 +++++++++
 .../test/resources/test-sql-client-catalogs.yaml   | 21 +++++++++++
 12 files changed, 139 insertions(+), 49 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
index 9dccfb7..1e3ca21 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
@@ -20,12 +20,13 @@ package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sinks.BatchTableSink;
 import org.apache.flink.table.sinks.OutputFormatTableSink;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 /**
@@ -35,13 +36,12 @@ public class CollectBatchTableSink extends 
OutputFormatTableSink<Row> implements
 
        private final String accumulatorName;
        private final TypeSerializer<Row> serializer;
+       private final TableSchema tableSchema;
 
-       private String[] fieldNames;
-       private TypeInformation<?>[] fieldTypes;
-
-       public CollectBatchTableSink(String accumulatorName, 
TypeSerializer<Row> serializer) {
+       public CollectBatchTableSink(String accumulatorName, 
TypeSerializer<Row> serializer, TableSchema tableSchema) {
                this.accumulatorName = accumulatorName;
                this.serializer = serializer;
+               this.tableSchema = tableSchema;
        }
 
        /**
@@ -52,26 +52,18 @@ public class CollectBatchTableSink extends 
OutputFormatTableSink<Row> implements
        }
 
        @Override
-       public TypeInformation<Row> getOutputType() {
-               return Types.ROW_NAMED(fieldNames, fieldTypes);
-       }
-
-       @Override
-       public String[] getFieldNames() {
-               return fieldNames;
+       public DataType getConsumedDataType() {
+               return getTableSchema().toRowDataType();
        }
 
        @Override
-       public TypeInformation<?>[] getFieldTypes() {
-               return fieldTypes;
+       public TableSchema getTableSchema() {
+               return tableSchema;
        }
 
        @Override
        public CollectBatchTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
-               final CollectBatchTableSink copy = new 
CollectBatchTableSink(accumulatorName, serializer);
-               copy.fieldNames = fieldNames;
-               copy.fieldTypes = fieldTypes;
-               return copy;
+               return new CollectBatchTableSink(accumulatorName, serializer, 
tableSchema);
        }
 
        @Override
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
index ce8565a..63adb07 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sinks.RetractStreamTableSink;
 import org.apache.flink.types.Row;
 
@@ -39,37 +40,28 @@ public class CollectStreamTableSink implements 
RetractStreamTableSink<Row> {
        private final InetAddress targetAddress;
        private final int targetPort;
        private final TypeSerializer<Tuple2<Boolean, Row>> serializer;
+       private final TableSchema tableSchema;
 
-       private String[] fieldNames;
-       private TypeInformation<?>[] fieldTypes;
-
-       public CollectStreamTableSink(InetAddress targetAddress, int 
targetPort, TypeSerializer<Tuple2<Boolean, Row>> serializer) {
+       public CollectStreamTableSink(InetAddress targetAddress, int 
targetPort, TypeSerializer<Tuple2<Boolean, Row>> serializer, TableSchema 
tableSchema) {
                this.targetAddress = targetAddress;
                this.targetPort = targetPort;
                this.serializer = serializer;
+               this.tableSchema = tableSchema;
        }
 
        @Override
-       public String[] getFieldNames() {
-               return fieldNames;
-       }
-
-       @Override
-       public TypeInformation<?>[] getFieldTypes() {
-               return fieldTypes;
+       public TableSchema getTableSchema() {
+               return tableSchema;
        }
 
        @Override
        public CollectStreamTableSink configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
-               final CollectStreamTableSink copy = new 
CollectStreamTableSink(targetAddress, targetPort, serializer);
-               copy.fieldNames = fieldNames;
-               copy.fieldTypes = fieldTypes;
-               return copy;
+               return new CollectStreamTableSink(targetAddress, targetPort, 
serializer, tableSchema);
        }
 
        @Override
        public TypeInformation<Row> getRecordType() {
-               return Types.ROW_NAMED(fieldNames, fieldTypes);
+               return getTableSchema().toRowType();
        }
 
        @Override
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index c3cc12b..27a5278 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -67,10 +67,11 @@ public class ResultStore {
                        final int gatewayPort = 
getGatewayPort(env.getDeployment());
 
                        if (env.getExecution().isChangelogMode()) {
-                               return new 
ChangelogCollectStreamResult<>(outputType, config, gatewayAddress, gatewayPort);
+                               return new 
ChangelogCollectStreamResult<>(outputType, schema, config, gatewayAddress, 
gatewayPort);
                        } else {
                                return new MaterializedCollectStreamResult<>(
                                        outputType,
+                                       schema,
                                        config,
                                        gatewayAddress,
                                        gatewayPort,
@@ -82,7 +83,7 @@ public class ResultStore {
                        if (!env.getExecution().isTableMode()) {
                                throw new SqlExecutionException("Results of 
batch queries can only be served in table mode.");
                        }
-                       return new MaterializedCollectBatchResult<>(outputType, 
config);
+                       return new MaterializedCollectBatchResult<>(schema, 
outputType, config);
                }
        }
 
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
index 38e9126..43f2de5 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/ChangelogCollectStreamResult.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.client.gateway.local.result;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.types.Row;
 
@@ -38,9 +39,9 @@ public class ChangelogCollectStreamResult<C> extends 
CollectStreamResult<C> impl
        private List<Tuple2<Boolean, Row>> changeRecordBuffer;
        private static final int CHANGE_RECORD_BUFFER_SIZE = 5_000;
 
-       public ChangelogCollectStreamResult(RowTypeInfo outputType, 
ExecutionConfig config,
+       public ChangelogCollectStreamResult(RowTypeInfo outputType, TableSchema 
tableSchema, ExecutionConfig config,
                        InetAddress gatewayAddress, int gatewayPort) {
-               super(outputType, config, gatewayAddress, gatewayPort);
+               super(outputType, tableSchema, config, gatewayAddress, 
gatewayPort);
 
                // prepare for changelog
                changeRecordBuffer = new ArrayList<>();
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
index 6cdebca..fe74cb9 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/CollectStreamResult.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.experimental.SocketStreamIterator;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
@@ -55,7 +56,7 @@ public abstract class CollectStreamResult<C> extends 
BasicResult<C> implements D
        protected final Object resultLock;
        protected SqlExecutionException executionException;
 
-       public CollectStreamResult(RowTypeInfo outputType, ExecutionConfig 
config,
+       public CollectStreamResult(RowTypeInfo outputType, TableSchema 
tableSchema, ExecutionConfig config,
                        InetAddress gatewayAddress, int gatewayPort) {
                this.outputType = outputType;
 
@@ -73,8 +74,7 @@ public abstract class CollectStreamResult<C> extends 
BasicResult<C> implements D
 
                // create table sink
                // pass binding address and port such that sink knows where to 
send to
-               collectTableSink = new 
CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), 
serializer)
-                       .configure(outputType.getFieldNames(), 
outputType.getFieldTypes());
+               collectTableSink = new 
CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), 
serializer, tableSchema);
                retrievalThread = new ResultRetrievalThread();
                monitoringThread = new JobMonitoringThread();
        }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
index dc482d0..2fe61b8 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectBatchResult.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.local.CollectBatchTableSink;
@@ -54,12 +55,11 @@ public class MaterializedCollectBatchResult<C> extends 
BasicResult<C> implements
 
        private volatile boolean snapshotted = false;
 
-       public MaterializedCollectBatchResult(RowTypeInfo outputType, 
ExecutionConfig config) {
+       public MaterializedCollectBatchResult(TableSchema tableSchema, 
RowTypeInfo outputType, ExecutionConfig config) {
                this.outputType = outputType;
 
                accumulatorName = new AbstractID().toString();
-               tableSink = new CollectBatchTableSink(accumulatorName, 
outputType.createSerializer(config))
-                       .configure(outputType.getFieldNames(), 
outputType.getFieldTypes());
+               tableSink = new CollectBatchTableSink(accumulatorName, 
outputType.createSerializer(config), tableSchema);
                resultLock = new Object();
                retrievalThread = new ResultRetrievalThread();
 
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
index 2becfda..7398758 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.types.Row;
@@ -91,12 +92,13 @@ public class MaterializedCollectStreamResult<C> extends 
CollectStreamResult<C> i
        @VisibleForTesting
        public MaterializedCollectStreamResult(
                        RowTypeInfo outputType,
+                       TableSchema tableSchema,
                        ExecutionConfig config,
                        InetAddress gatewayAddress,
                        int gatewayPort,
                        int maxRowCount,
                        int overcommitThreshold) {
-               super(outputType, config, gatewayAddress, gatewayPort);
+               super(outputType, tableSchema, config, gatewayAddress, 
gatewayPort);
 
                if (maxRowCount <= 0) {
                        this.maxRowCount = Integer.MAX_VALUE;
@@ -118,6 +120,7 @@ public class MaterializedCollectStreamResult<C> extends 
CollectStreamResult<C> i
 
        public MaterializedCollectStreamResult(
                        RowTypeInfo outputType,
+                       TableSchema tableSchema,
                        ExecutionConfig config,
                        InetAddress gatewayAddress,
                        int gatewayPort,
@@ -125,6 +128,7 @@ public class MaterializedCollectStreamResult<C> extends 
CollectStreamResult<C> i
 
                this(
                        outputType,
+                       tableSchema,
                        config,
                        gatewayAddress,
                        gatewayPort,
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 5fabe83..aad2da1 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -44,6 +45,7 @@ import 
org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase;
 import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.types.DataType;
 
 import org.junit.Test;
 
@@ -172,6 +174,7 @@ public class DependencyTest {
        public static class TestHiveCatalogFactory extends HiveCatalogFactory {
                public static final String ADDITIONAL_TEST_DATABASE = 
"additional_test_database";
                public static final String TEST_TABLE = "test_table";
+               static final String TABLE_WITH_PARAMETERIZED_TYPES = 
"para_types_table";
 
                @Override
                public Map<String, String> requiredContext() {
@@ -213,11 +216,21 @@ public class DependencyTest {
                                        ),
                                        false
                                );
+                               // create a table to test parameterized types
+                               hiveCatalog.createTable(new 
ObjectPath("default", TABLE_WITH_PARAMETERIZED_TYPES),
+                                               tableWithParameterizedTypes(),
+                                               false);
                        } catch (DatabaseAlreadyExistException | 
TableAlreadyExistException | DatabaseNotExistException e) {
                                throw new CatalogException(e);
                        }
 
                        return hiveCatalog;
                }
+
+               private CatalogTable tableWithParameterizedTypes() {
+                       TableSchema tableSchema = 
TableSchema.builder().fields(new String[]{"dec", "ch", "vch"},
+                                       new DataType[]{DataTypes.DECIMAL(10, 
10), DataTypes.CHAR(5), DataTypes.VARCHAR(15)}).build();
+                       return new CatalogTableImpl(tableSchema, 
Collections.emptyMap(), "");
+               }
        }
 }
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index cbae581..c102acf 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -49,6 +49,7 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -479,11 +480,14 @@ public class LocalExecutorITCase extends TestLogger {
        @Test
        public void testUseCatalogAndUseDatabase() throws Exception {
                final String csvOutputPath = new 
File(tempFolder.newFolder().getAbsolutePath(), 
"test-out.csv").toURI().toString();
-               final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
-               Objects.requireNonNull(url);
+               final URL url1 = 
getClass().getClassLoader().getResource("test-data.csv");
+               final URL url2 = 
getClass().getClassLoader().getResource("test-data-1.csv");
+               Objects.requireNonNull(url1);
+               Objects.requireNonNull(url2);
                final Map<String, String> replaceVars = new HashMap<>();
                replaceVars.put("$VAR_PLANNER", planner);
-               replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
+               replaceVars.put("$VAR_SOURCE_PATH1", url1.getPath());
+               replaceVars.put("$VAR_SOURCE_PATH2", url2.getPath());
                replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
                replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
                replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
@@ -501,7 +505,8 @@ public class LocalExecutorITCase extends TestLogger {
                                
Arrays.asList(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, 
HiveCatalog.DEFAULT_DB),
                                executor.listDatabases(session));
 
-                       assertEquals(Collections.emptyList(), 
executor.listTables(session));
+                       
assertEquals(Collections.singletonList(DependencyTest.TestHiveCatalogFactory.TABLE_WITH_PARAMETERIZED_TYPES),
+                                       executor.listTables(session));
 
                        executor.useDatabase(session, 
DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE);
 
@@ -529,6 +534,36 @@ public class LocalExecutorITCase extends TestLogger {
                executor.useCatalog(session, "nonexistingcatalog");
        }
 
+       @Test
+       public void testParameterizedTypes() throws Exception {
+               // only blink planner supports parameterized types
+               Assume.assumeTrue(planner.equals("blink"));
+               final URL url1 = 
getClass().getClassLoader().getResource("test-data.csv");
+               final URL url2 = 
getClass().getClassLoader().getResource("test-data-1.csv");
+               Objects.requireNonNull(url1);
+               Objects.requireNonNull(url2);
+               final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", planner);
+               replaceVars.put("$VAR_SOURCE_PATH1", url1.getPath());
+               replaceVars.put("$VAR_SOURCE_PATH2", url2.getPath());
+               replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
+               replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
+               replaceVars.put("$VAR_MAX_ROWS", "100");
+               replaceVars.put("$VAR_RESULT_MODE", "table");
+
+               final Executor executor = 
createModifiedExecutor(CATALOGS_ENVIRONMENT_FILE, clusterClient, replaceVars);
+               final SessionContext session = new 
SessionContext("test-session", new Environment());
+               executor.useCatalog(session, "hivecatalog");
+               String resultID = executor.executeQuery(session,
+                               "select * from " + 
DependencyTest.TestHiveCatalogFactory.TABLE_WITH_PARAMETERIZED_TYPES).getResultId();
+               retrieveTableResult(executor, session, resultID);
+
+               // make sure legacy types still work
+               executor.useCatalog(session, "default_catalog");
+               resultID = executor.executeQuery(session, "select * from 
TableNumber3").getResultId();
+               retrieveTableResult(executor, session, resultID);
+       }
+
        private void executeStreamQueryTable(
                        Map<String, String> replaceVars,
                        String query,
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
index c7636cd..cf85011 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
@@ -22,7 +22,10 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.gateway.TypedResult;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 
 import org.junit.Test;
@@ -43,11 +46,14 @@ public class MaterializedCollectStreamResultTest {
        @Test
        public void testSnapshot() throws UnknownHostException {
                final RowTypeInfo type = new RowTypeInfo(Types.STRING, 
Types.LONG);
+               TableSchema tableSchema = TableSchema.builder().fields(
+                               new String[]{"f0", "f1"}, new 
DataType[]{DataTypes.STRING(), DataTypes.BIGINT()}).build();
 
                TestMaterializedCollectStreamResult<?> result = null;
                try {
                        result = new TestMaterializedCollectStreamResult<>(
                                type,
+                               tableSchema,
                                new ExecutionConfig(),
                                InetAddress.getLocalHost(),
                                0,
@@ -91,11 +97,14 @@ public class MaterializedCollectStreamResultTest {
        @Test
        public void testLimitedSnapshot() throws UnknownHostException {
                final RowTypeInfo type = new RowTypeInfo(Types.STRING, 
Types.LONG);
+               TableSchema tableSchema = TableSchema.builder().fields(
+                               new String[]{"f0", "f1"}, new 
DataType[]{DataTypes.STRING(), DataTypes.BIGINT()}).build();
 
                TestMaterializedCollectStreamResult<?> result = null;
                try {
                        result = new TestMaterializedCollectStreamResult<>(
                                type,
+                               tableSchema,
                                new ExecutionConfig(),
                                InetAddress.getLocalHost(),
                                0,
@@ -146,6 +155,7 @@ public class MaterializedCollectStreamResultTest {
 
                public TestMaterializedCollectStreamResult(
                                RowTypeInfo outputType,
+                               TableSchema tableSchema,
                                ExecutionConfig config,
                                InetAddress gatewayAddress,
                                int gatewayPort,
@@ -154,6 +164,7 @@ public class MaterializedCollectStreamResultTest {
 
                        super(
                                outputType,
+                               tableSchema,
                                config,
                                gatewayAddress,
                                gatewayPort,
@@ -163,6 +174,7 @@ public class MaterializedCollectStreamResultTest {
 
                public TestMaterializedCollectStreamResult(
                                RowTypeInfo outputType,
+                               TableSchema tableSchema,
                                ExecutionConfig config,
                                InetAddress gatewayAddress,
                                int gatewayPort,
@@ -170,6 +182,7 @@ public class MaterializedCollectStreamResultTest {
 
                        super(
                                outputType,
+                               tableSchema,
                                config,
                                gatewayAddress,
                                gatewayPort,
diff --git a/flink-table/flink-sql-client/src/test/resources/test-data-1.csv 
b/flink-table/flink-sql-client/src/test/resources/test-data-1.csv
new file mode 100644
index 0000000..1794cb7
--- /dev/null
+++ b/flink-table/flink-sql-client/src/test/resources/test-data-1.csv
@@ -0,0 +1,18 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+123.123,abcd
\ No newline at end of file
diff --git 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
index d4b8010..ff2ccea 100644
--- 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
+++ 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-catalogs.yaml
@@ -71,6 +71,26 @@ tables:
   - name: TestView2
     type: view
     query: SELECT * FROM default_catalog.default_database.TestView1
+  - name: TableNumber3
+    type: source-table
+    $VAR_UPDATE_MODE
+    schema:
+      - name: DecimalField
+        type: DECIMAL
+      - name: StringField
+        type: VARCHAR
+    connector:
+      type: filesystem
+      path: "$VAR_SOURCE_PATH2"
+    format:
+      type: csv
+      fields:
+        - name: DecimalField
+          type: DECIMAL
+        - name: StringField
+          type: VARCHAR
+      line-delimiter: "\n"
+      comment-prefix: "#"
 
 functions:
   - name: scalarUDF
@@ -98,6 +118,7 @@ functions:
         value: 5
 
 execution:
+  planner: "$VAR_PLANNER"
   type: "$VAR_EXECUTION_TYPE"
   time-characteristic: event-time
   periodic-watermarks-interval: 99

Reply via email to