Re: [External Sender] Re: Flink sql nested elements

2020-06-09 Thread Ramana Uppala
Hi Dawid,

This issue has been resolved.

>From our debugging we found out that Calcite parser was able to resolve the 
>nested elements as expected. But, expecting case to match with the schema. Our 
>SQL select field case and schema field case was not matching in this scenario. 
>After fixing sql to have the correct case, query worked as expected.

Is Flink SQL case is case sensitive ? We don't see any documentation related to 
this.

It will be great if we can convert all query elements to lower case similar to 
Hive.

On 2020/06/09 07:58:20, Dawid Wysakowicz  wrote: 
> Hi Ramana,
> 
> Could you help us with a way to reproduce the behaviour? I could not
> reproduce it locally. The code below works for me just fine:
> 
> |StreamExecutionEnvironment exec =
> StreamExecutionEnvironment.getExecutionEnvironment();||
> ||StreamTableEnvironment tEnv = StreamTableEnvironment.create(||
> ||        exec,||
> ||       
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());||
> ||tEnv.registerTableSource(||
> ||        "T",||
> ||        new StreamTableSource() {||
> ||            @Override||
> ||            public TableSchema getTableSchema() {||
> ||                return TableSchema.builder()||
> ||                        .field("f3",
> DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING(||
> ||                        .build();||
> ||            }||
> ||            @Override||
> ||            public DataStream
> getDataStream(StreamExecutionEnvironment execEnv) {||
> ||                return execEnv.fromCollection(||
> ||                        Arrays.asList(Row.of(Row.of("ABCDE")))||
> ||                );||
> ||            }||
> ||            @Override||
> ||            public DataType getProducedDataType() {||
> ||                return DataTypes.ROW(||
> ||                        DataTypes.FIELD(||
> ||                                "f3",||
> ||                               
> DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING()))||
> ||                        )||
> ||                );||
> ||            }||
> ||        });||
> ||Table table = tEnv.sqlQuery("SELECT f3.nested FROM T");||
> ||DataStream result = tEnv.toAppendStream(||
> ||        table,||
> ||        Types.ROW(Types.STRING()));||
> ||result.print();||
> ||exec.execute();|
> 
> Best,
> 
> Dawid
> 
> On 05/06/2020 13:59, Ramana Uppala wrote:
> > Hi Leonard,
> >
> > We are using Flink 1.10 version and I can not share the complete
> > schema but it looks like below in Hive Catalog, 
> >
> > flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647),
> > `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
> > VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
> >
> > Based on the stack trace, sqlUpdate API validates the sql statement
> > and throwing the above error.  Do we need to configure any Calcite
> > configuration to support nested types ?
> >
> > Thanks,
> > Ramana.
> >
> > On Fri, Jun 5, 2020 at 12:49 AM Leonard Xu  > > wrote:
> >
> > Hi,Ramana
> >
> > For nested data type, Flink use dot (eg a.b.c) to visit nested
> > elements. Your SQL syntax looks right, which Flink version are you
> > using? And could you post your Avro Schema file and DDL ?
> >
> > Best,
> > Leonard Xu
> >
> > > 在 2020年6月5日,03:34,Ramana Uppala  > > 写道:
> > >
> > > We have Avro schema that contains nested structure and when
> > querying using Flink SQL, we are getting below error.
> > >
> > > Exception in thread "main" java.lang.AssertionError
> > >       at
> > org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236)
> > >       at
> > org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226)
> > >       at
> > 
> > org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232)
> > >       at
> > 
> > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416)
> > >       at
> > 
> > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733)
> > >       at
> > 
> > org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718)
> > >       at
> > org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> > >
> > > Example Schema:
> > > ROW<`col1` VARCHAR(2147483647), `postalAddress`
> > ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
> > VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
> > >
> > > Example SQL:
> > > insert into CSVSink
> > > select
> > > col1,
> > > postalAddress.addressLine1 as address
> > > from myStream
> > >
> > > In Flink SQL, How to select nested elements ?
> > >
> >
> > 
> >
> >
> > The information contained in this 

Re: [External Sender] Re: Flink sql nested elements

2020-06-09 Thread Dawid Wysakowicz
Hi Ramana,

Could you help us with a way to reproduce the behaviour? I could not
reproduce it locally. The code below works for me just fine:

|StreamExecutionEnvironment exec =
StreamExecutionEnvironment.getExecutionEnvironment();||
||StreamTableEnvironment tEnv = StreamTableEnvironment.create(||
||        exec,||
||       
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());||
||tEnv.registerTableSource(||
||        "T",||
||        new StreamTableSource() {||
||            @Override||
||            public TableSchema getTableSchema() {||
||                return TableSchema.builder()||
||                        .field("f3",
DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING(||
||                        .build();||
||            }||
||            @Override||
||            public DataStream
getDataStream(StreamExecutionEnvironment execEnv) {||
||                return execEnv.fromCollection(||
||                        Arrays.asList(Row.of(Row.of("ABCDE")))||
||                );||
||            }||
||            @Override||
||            public DataType getProducedDataType() {||
||                return DataTypes.ROW(||
||                        DataTypes.FIELD(||
||                                "f3",||
||                               
DataTypes.ROW(DataTypes.FIELD("nested", DataTypes.STRING()))||
||                        )||
||                );||
||            }||
||        });||
||Table table = tEnv.sqlQuery("SELECT f3.nested FROM T");||
||DataStream result = tEnv.toAppendStream(||
||        table,||
||        Types.ROW(Types.STRING()));||
||result.print();||
||exec.execute();|

Best,

Dawid

On 05/06/2020 13:59, Ramana Uppala wrote:
> Hi Leonard,
>
> We are using Flink 1.10 version and I can not share the complete
> schema but it looks like below in Hive Catalog, 
>
> flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647),
> `postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
> VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
>
> Based on the stack trace, sqlUpdate API validates the sql statement
> and throwing the above error.  Do we need to configure any Calcite
> configuration to support nested types ?
>
> Thanks,
> Ramana.
>
> On Fri, Jun 5, 2020 at 12:49 AM Leonard Xu  > wrote:
>
> Hi,Ramana
>
> For nested data type, Flink use dot (eg a.b.c) to visit nested
> elements. Your SQL syntax looks right, which Flink version are you
> using? And could you post your Avro Schema file and DDL ?
>
> Best,
> Leonard Xu
>
> > 在 2020年6月5日,03:34,Ramana Uppala  > 写道:
> >
> > We have Avro schema that contains nested structure and when
> querying using Flink SQL, we are getting below error.
> >
> > Exception in thread "main" java.lang.AssertionError
> >       at
> org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236)
> >       at
> org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226)
> >       at
> org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232)
> >       at
> 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416)
> >       at
> 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733)
> >       at
> 
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718)
> >       at
> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> >
> > Example Schema:
> > ROW<`col1` VARCHAR(2147483647), `postalAddress`
> ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
> VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>
> >
> > Example SQL:
> > insert into CSVSink
> > select
> > col1,
> > postalAddress.addressLine1 as address
> > from myStream
> >
> > In Flink SQL, How to select nested elements ?
> >
>
> 
>
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The
> information transmitted herewith is intended only for use by the
> individual or entity to which it is addressed. If the reader of this
> message is not the intended recipient, you are hereby notified that
> any review, retransmission, dissemination, distribution, copying or
> other use of, or taking of any action in reliance upon this
> information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the
> material from your computer.
>
>


signature.asc
Description: OpenPGP digital signature


Re: [External Sender] Re: Flink sql nested elements

2020-06-05 Thread Ramana Uppala
Hi Leonard,

We are using Flink 1.10 version and I can not share the complete schema but
it looks like below in Hive Catalog,

flink.generic.table.schema.1.data-type ROW<`col1` VARCHAR(2147483647),
`postalAddress` ROW<`addressLine1` VARCHAR(2147483647), `addressLine2`
VARCHAR(2147483647), `addressLine3` VARCHAR(2147483647)>>

Based on the stack trace, sqlUpdate API validates the sql statement and
throwing the above error.  Do we need to configure any Calcite
configuration to support nested types ?

Thanks,
Ramana.

On Fri, Jun 5, 2020 at 12:49 AM Leonard Xu  wrote:

> Hi,Ramana
>
> For nested data type, Flink use dot (eg a.b.c) to visit nested elements.
> Your SQL syntax looks right, which Flink version are you using? And could
> you post your Avro Schema file and DDL ?
>
> Best,
> Leonard Xu
>
> > 在 2020年6月5日,03:34,Ramana Uppala  写道:
> >
> > We have Avro schema that contains nested structure and when querying
> using Flink SQL, we are getting below error.
> >
> > Exception in thread "main" java.lang.AssertionError
> >   at
> org.apache.calcite.sql.parser.SqlParserPos.sum_(SqlParserPos.java:236)
> >   at
> org.apache.calcite.sql.parser.SqlParserPos.sum(SqlParserPos.java:226)
> >   at
> org.apache.calcite.sql.SqlIdentifier.getComponent(SqlIdentifier.java:232)
> >   at
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:416)
> >   at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5733)
> >   at
> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5718)
> >   at
> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
> >
> > Example Schema:
> > ROW<`col1` VARCHAR(2147483647), `postalAddress` ROW<`addressLine1`
> VARCHAR(2147483647), `addressLine2` VARCHAR(2147483647), `addressLine3`
> VARCHAR(2147483647)>>
> >
> > Example SQL:
> > insert into CSVSink
> > select
> > col1,
> > postalAddress.addressLine1 as address
> > from myStream
> >
> > In Flink SQL, How to select nested elements ?
> >
>
>

__



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.