Re: [External Sender] Re: Avro Arrat type validation error

2020-06-09 Thread Dawid Wysakowicz
To make sure we are on the same page.

The end goal is to have the

CatalogTable#getTableSchema/TableSource#getTableSchema return a schema
that is compatible with TableSource#getProducedDataType.

If you want to use the new types, you should not implement the
TableSource#getReturnType. Moreover you should also not use any Flink
utilities that convert from TypeInformation to DataTypes as those
produce legacy types.

I am aware there is a lot of corner cases and we worked hard to improve
the situation with the new sources and sinks interfaces.

Below I add an example how you could pass different array types:

|StreamExecutionEnvironment exec =
StreamExecutionEnvironment.getExecutionEnvironment();||
||StreamTableEnvironment tEnv = StreamTableEnvironment.create(exec);||
||tEnv.registerTableSource(||
||    "T",||
||    new StreamTableSource() {||
||        @Override||
||        public TableSchema getTableSchema() {||
||            return TableSchema.builder()||
||                .field("f0",
DataTypes.ARRAY(DataTypes.BIGINT().notNull()))||
||                .field("f1", DataTypes.ARRAY(DataTypes.BIGINT()))||
||                .field("f2", DataTypes.ARRAY(DataTypes.STRING()))||
||                .build();||
||        }|||

|        @Override
        public DataStream getDataStream(StreamExecutionEnvironment
execEnv) {
            return execEnv.fromCollection(
                Arrays.asList(Row.of(new long[]{1}, new Long[]{new
Long(1)}, new String[]{"ABCDE"})),
                // this is necessary for STRING array, cause otherwise
DataStream produces a different
                // TypeInformation than the planner expects
                (TypeInformation)
TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType())
            );
        }||
||
||        @Override||
||        public DataType getProducedDataType() {||
||            return DataTypes.ROW(||
||                DataTypes.FIELD(||
||                    "f0",||
||                   
DataTypes.ARRAY(DataTypes.BIGINT().notNull().bridgedTo(long.class))||
||                        .bridgedTo(long[].class)),||
||                DataTypes.FIELD(||
||                    "f1",||
||                    DataTypes.ARRAY(DataTypes.BIGINT())),||
||                DataTypes.FIELD(||
||                    "f2",||
||                    DataTypes.ARRAY(DataTypes.STRING()))||
||                );||
||        }||
||    });||
||
||Table table = tEnv.sqlQuery("SELECT f0, f1, f2 FROM T");||
||DataStream result = tEnv.toAppendStream(||
||    table,||
||    Types.ROW(||
||        Types.PRIMITIVE_ARRAY(Types.LONG),||
||        ObjectArrayTypeInfo.getInfoFor(Types.LONG),||
||        ObjectArrayTypeInfo.getInfoFor(Types.STRING)));||
||result.print();||
||env.execute();|

Hope this will help and that it will be much easier in Flink 1.11

Best,

Dawid

On 05/06/2020 13:33, Ramana Uppala wrote:
> Hi Dawid,
>
> We are using a custom connector that is very similar to Flink Kafka
> Connector and  instantiating TableSchema using a custom class which
> maps Avro types to Flink's DataTypes using TableSchema.Builder.
>
> For Array type, we have below mapping:
>
>  case ARRAY:
>                 return
> DataTypes.ARRAY(toFlinkType(schema.getElementType()));
>
>
> We are using Hive Catalog and creating tables
> using CatalogTableImpl with TableSchema.
>
> As you mentioned, if we create TableSchema with legacy types, our
> connectors works without any issues. But, we want to use the new Flink
> DataTypes API but having issues.
>
> Also, one more observation is if we use legacy types in TableSource
> creation, application not working using Blink Planner. We are getting
> the same error physical type not matching.
>
> Looking forward to the 1.11 changes.
>
>
> On Fri, Jun 5, 2020 at 3:34 AM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Hi Ramana,
>
> What connector do you use or how do you instantiate the TableSource?
> Also which catalog do you use and how do you register your table
> in that
> catalog?
>
> The problem is that conversion from TypeInformation to DataType
> produces
> legacy types (because they cannot be mapped exactyl 1-1 to the new
> types).
>
> If you can change the code of the TableSource you can return in the
> TableSource#getProducedType the tableSchema.toRowDataType, where the
> tableSchema is the schema coming from catalog. Or you can make
> sure that
> the catalog table produces the legacy type:
>
> TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING));
>
> In 1.11 we will introduce new sources and formats already working
> entirely with the new type system
> (AvroRowDataDeserializationSchema and
> KafkaDynamicTable).
>
> Hope this helps a bit.
>
> Best,
>
> Dawid
>
> On 04/06/2020 13:43, Ramana Uppala wrote:
> > Hi,
> > Avro schema contains Array type and we created
> TableSchema out of the AvroSchema and created a table in catalog.
> In 

Re: [External Sender] Re: Avro Arrat type validation error

2020-06-05 Thread Ramana Uppala
Hi Dawid,

We are using a custom connector that is very similar to Flink Kafka
Connector and  instantiating TableSchema using a custom class which maps
Avro types to Flink's DataTypes using TableSchema.Builder.

For Array type, we have below mapping:

 case ARRAY:
return
DataTypes.ARRAY(toFlinkType(schema.getElementType()));


We are using Hive Catalog and creating tables using CatalogTableImpl with
TableSchema.

As you mentioned, if we create TableSchema with legacy types, our
connectors works without any issues. But, we want to use the new Flink
DataTypes API but having issues.

Also, one more observation is if we use legacy types in TableSource
creation, application not working using Blink Planner. We are getting the
same error physical type not matching.

Looking forward to the 1.11 changes.


On Fri, Jun 5, 2020 at 3:34 AM Dawid Wysakowicz 
wrote:

> Hi Ramana,
>
> What connector do you use or how do you instantiate the TableSource?
> Also which catalog do you use and how do you register your table in that
> catalog?
>
> The problem is that conversion from TypeInformation to DataType produces
> legacy types (because they cannot be mapped exactyl 1-1 to the new types).
>
> If you can change the code of the TableSource you can return in the
> TableSource#getProducedType the tableSchema.toRowDataType, where the
> tableSchema is the schema coming from catalog. Or you can make sure that
> the catalog table produces the legacy type:
>
> TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING));
>
> In 1.11 we will introduce new sources and formats already working
> entirely with the new type system (AvroRowDataDeserializationSchema and
> KafkaDynamicTable).
>
> Hope this helps a bit.
>
> Best,
>
> Dawid
>
> On 04/06/2020 13:43, Ramana Uppala wrote:
> > Hi,
> > Avro schema contains Array type and we created TableSchema out
> of the AvroSchema and created a table in catalog. In the catalog, this
> specific filed type shown as ARRAY. We are using
> AvroRowDeserializationSchema with the connector and returnType of
> TableSource showing Array mapped to LEGACY('ARRAY',
> 'ANY<[Ljava.lang.String;, by AvroSchemaConverter
> >
> > when we are running the application, planner validating physical types
> and logical types and we are getting below error.
> >
> > of table field 'XYZ' does not match with the physical type ROW<
> >
> > Any suggestions on how to resolve this ? is this a bug ?
>
>

__



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.