Hi Simon,

First of all for more thorough discussion you might want to have a look
at this thread:
https://lists.apache.org/thread.html/b450df1a7bf10187301820e529cbc223ce63f233c1af0f0c7415e62b@%3Cdev.flink.apache.org%3E

TL;DR; All objects registered with registerTable/registerTableSource are
temporary objects that do not have serializable form and therefore can
only be stored in an in-memory catalog. The useCatalog/useDatabase are
experimental APIs in the upcoming 1.9 release.

If you want to be sure that tables are stored in a given catalog you can
either register it directly via tEnv.getCatalog().createTable() or you
can try using SQL DDL.

Best,

Dawid

On 12/08/2019 09:37, Simon Su wrote:
> Hi All
>     I want to use a custom catalog by setting the name “ca1” and
> create a database under this catalog. When I submit the 
> SQL, and it raises the error like :
>
>     Exception in thread "main"
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 1, column 98 to line 1, column 116: Object 'orderstream' not
> found within 'ca1.db1'
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:154)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:89)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:130)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)
> at sqlrunner.RowTimeTest.memoryCatalog(RowTimeTest.java:126)
> at sqlrunner.RowTimeTest.main(RowTimeTest.java:137)
> Caused by: org.apache.calcite.runtime.CalciteContextException: From
> line 1, column 98 to line 1, column 116: Object 'orderstream' not
> found within 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:809)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4805)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:166)
> at
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:122)
> ... 7 more
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
> Object 'orderstream' not found within 'ca1.db1'
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
> ... 26 more
>    
> It seems that Calcite cannot find the source object as expected, After
> I debug the code I found that when using tableEnv.registerTableSource
> or registerTableSink, It will use a build-in catalog with a hard-code
> catalog name ( default-catalog ) and database name ( default_database
> ) while tableEnv.registerCatalog here cannot change this behaviros, So
> is this a reasonable behaviors ? If I don’t want to use default
> build-in catalog and database, is there any other ways to do this ?
>
>    GenericInMemoryCatalog catalog = new GenericInMemoryCatalog("ca1");
> tableEnv.registerCatalog(catalog.getName(), catalog); // Not work to change 
> build-in catalog !! tableEnv.useCatalog(catalog.getName()); 
> catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), 
> "comment"), true); tableEnv.useDatabase("db1"); tableEnv.connect(sourceKafka) 
> .withFormat(csv) .withSchema(schema2) .inAppendMode() 
> .registerTableSource("orderstream"); tableEnv.connect(sinkKafka) 
> .withFormat(csv) .withSchema(schema2) .inAppendMode() 
> .registerTableSink("sinkstream");; String sql = "insert into 
> ca1.db1.sinkstream " + "select tumble_start(ts,
> INTERVAL '5' SECOND) as t, max(data) from ca1.db1.orderstream " +
> "group by tumble(ts, INTERVAL '5' SECOND), data"; tableEnv.sqlUpdate(sql); 
> tableEnv.execute("test");
>
> Thanks,
> SImon
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to