Thanks Jark for the update.

However, getting back to the original question, can I use a nested column
directly for CREATE TABLE PARTITIONED BY like below without declaring an
additional column?

CREATE TABLE output
> PARTITIONED BY (`location.transId`)
> WITH (
>   'connector' = 'filesystem',
>   'path' = 'east-out',
>   'format' = 'json'
> ) LIKE navi (EXCLUDING ALL)
>

I tried (`location`.transId) as well but it fails with an exception:

> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> SQL parse failed. Encountered "." at line 3, column 27.
> Was expecting one of:
>     ")" ...
>     "," ...
>
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered
> "." at line 3, column 27.
> Was expecting one of:
>     ")" ...
>     "," ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201)
> at
> org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148)
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163)
> at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188)
> at
> org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54)
> ... 3 more
> Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered
> "." at line 3, column 27.
> Was expecting one of:
>     ")" ...
>     "," ...
>
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800)
> at
> org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248)
> at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161)
> ... 5 more


Best,

Dongwon

On Wed, Jul 22, 2020 at 12:09 AM Jark Wu <imj...@gmail.com> wrote:

> Hi Dongwon,
>
> I think this is a bug in the Filesystem connector which doesn't exclude
> the computed columns when building the TableSource.
> I created an issue [1] to track this problem.
>
> Best,
> Jark
>
> [1]: https://issues.apache.org/jira/browse/FLINK-18665
>
> On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <eastcirc...@gmail.com> wrote:
>
>> Hi Danny,
>>
>>  Which version did you use
>>
>> I use Flink 1.11.0.
>>
>>
>>>  what SQL context throws the error ?
>>
>> I think the declaration itself is not a problem.
>> The exception occurs when I tried to execute the following which I didn't
>> show you in the previous email:
>>
>>> tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output")
>>
>>
>> Thanks,
>>
>> Dongwon
>>
>> On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <yuzhao....@gmail.com> wrote:
>>
>>> Hi, I execute the sql below
>>>
>>>   """
>>>     |create table navi (
>>>     |  a STRING,
>>>     |  location ROW<lastUpdateTime BIGINT, transId STRING>
>>>     |) with (
>>>     |  'connector' = 'filesystem',
>>>     |  'path' = 'east-out',
>>>     |  'format' = 'json'
>>>     |)
>>>     |""".stripMargin
>>> tableEnv.executeSql(sql0)
>>> val sql =
>>>   """
>>>     |CREATE TABLE output (
>>>     |  `partition` AS location.transId
>>>     |) PARTITIONED BY (`partition`)
>>>     |WITH (
>>>     |  'connector' = 'filesystem',
>>>     |  'path' = 'east-out',
>>>     |  'format' = 'json'
>>>     |) LIKE navi (EXCLUDING ALL)
>>>     |""".stripMargin
>>> tableEnv.executeSql(sql)
>>>
>>>
>>> In master branch, both are correct, can you share you stack trace detail
>>> ? Which version did you use and what SQL context throws the error ?
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年7月21日 +0800 PM4:55,Dongwon Kim <eastcirc...@gmail.com>,写道:
>>>
>>> Hi,
>>>
>>> I want to create subdirectories named after values of a nested column,
>>> location.transId.
>>>
>>> This is my first attempt:
>>>
>>>> CREATE TABLE output
>>>> PARTITIONED BY (`location.transId`)
>>>> WITH (
>>>>   'connector' = 'filesystem',
>>>>   'path' = 'east-out',
>>>>   'format' = 'json'
>>>> ) LIKE navi (EXCLUDING ALL)
>>>>
>>>
>>> It fails with the following errors:
>>>
>>>> Exception in thread "main"
>>>> org.apache.flink.table.api.ValidationException: Partition column
>>>> 'location.transId' not defined in the table schema. Available columns:
>>>> ['type', 'location']
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
>>>> at
>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
>>>> at
>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
>>>> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28)
>>>>
>>>
>>> As It seems like nested columns are not recognized as a eligible column
>>> for PARTITIONED BY, I tried the following:
>>>
>>>> CREATE TABLE output (
>>>>   `partition` AS location.transId
>>>> ) PARTITIONED BY (`partition`)
>>>> WITH (
>>>>   'connector' = 'filesystem',
>>>>   'path' = 'east-out',
>>>>   'format' = 'json'
>>>> ) LIKE navi (EXCLUDING ALL)
>>>>
>>> It also fails:
>>>
>>>>  Exception in thread "main"
>>>> org.apache.flink.table.api.ValidationException: The field count of logical
>>>> schema of the table does not match with the field count of physical schema
>>>
>>> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
>>> STRING>]
>>> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId`
>>> STRING>,STRING].
>>>
>>> Thanks in advance,
>>>
>>> Dongwon
>>>
>>>

Reply via email to