To make sure we are on the same page.
The end goal is to have the
CatalogTable#getTableSchema/TableSource#getTableSchema return a schema
that is compatible with TableSource#getProducedDataType.
If you want to use the new types, you should not implement the
TableSource#getReturnType. Moreover you should also not use any Flink
utilities that convert from TypeInformation to DataTypes as those
produce legacy types.
I am aware there is a lot of corner cases and we worked hard to improve
the situation with the new sources and sinks interfaces.
Below I add an example how you could pass different array types:
|StreamExecutionEnvironment exec =
StreamExecutionEnvironment.getExecutionEnvironment();||
||StreamTableEnvironment tEnv = StreamTableEnvironment.create(exec);||
||tEnv.registerTableSource(||
|| "T",||
|| new StreamTableSource() {||
|| @Override||
|| public TableSchema getTableSchema() {||
|| return TableSchema.builder()||
|| .field("f0",
DataTypes.ARRAY(DataTypes.BIGINT().notNull()))||
|| .field("f1", DataTypes.ARRAY(DataTypes.BIGINT()))||
|| .field("f2", DataTypes.ARRAY(DataTypes.STRING()))||
|| .build();||
|| }|||
| @Override
public DataStream getDataStream(StreamExecutionEnvironment
execEnv) {
return execEnv.fromCollection(
Arrays.asList(Row.of(new long[]{1}, new Long[]{new
Long(1)}, new String[]{"ABCDE"})),
// this is necessary for STRING array, cause otherwise
DataStream produces a different
// TypeInformation than the planner expects
(TypeInformation)
TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType())
);
}||
||
|| @Override||
|| public DataType getProducedDataType() {||
|| return DataTypes.ROW(||
|| DataTypes.FIELD(||
|| "f0",||
||
DataTypes.ARRAY(DataTypes.BIGINT().notNull().bridgedTo(long.class))||
|| .bridgedTo(long[].class)),||
|| DataTypes.FIELD(||
|| "f1",||
|| DataTypes.ARRAY(DataTypes.BIGINT())),||
|| DataTypes.FIELD(||
|| "f2",||
|| DataTypes.ARRAY(DataTypes.STRING()))||
|| );||
|| }||
|| });||
||
||Table table = tEnv.sqlQuery("SELECT f0, f1, f2 FROM T");||
||DataStream result = tEnv.toAppendStream(||
|| table,||
|| Types.ROW(||
|| Types.PRIMITIVE_ARRAY(Types.LONG),||
|| ObjectArrayTypeInfo.getInfoFor(Types.LONG),||
|| ObjectArrayTypeInfo.getInfoFor(Types.STRING)));||
||result.print();||
||env.execute();|
Hope this will help and that it will be much easier in Flink 1.11
Best,
Dawid
On 05/06/2020 13:33, Ramana Uppala wrote:
> Hi Dawid,
>
> We are using a custom connector that is very similar to Flink Kafka
> Connector and instantiating TableSchema using a custom class which
> maps Avro types to Flink's DataTypes using TableSchema.Builder.
>
> For Array type, we have below mapping:
>
> case ARRAY:
> return
> DataTypes.ARRAY(toFlinkType(schema.getElementType()));
>
>
> We are using Hive Catalog and creating tables
> using CatalogTableImpl with TableSchema.
>
> As you mentioned, if we create TableSchema with legacy types, our
> connectors works without any issues. But, we want to use the new Flink
> DataTypes API but having issues.
>
> Also, one more observation is if we use legacy types in TableSource
> creation, application not working using Blink Planner. We are getting
> the same error physical type not matching.
>
> Looking forward to the 1.11 changes.
>
>
> On Fri, Jun 5, 2020 at 3:34 AM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Hi Ramana,
>
> What connector do you use or how do you instantiate the TableSource?
> Also which catalog do you use and how do you register your table
> in that
> catalog?
>
> The problem is that conversion from TypeInformation to DataType
> produces
> legacy types (because they cannot be mapped exactyl 1-1 to the new
> types).
>
> If you can change the code of the TableSource you can return in the
> TableSource#getProducedType the tableSchema.toRowDataType, where the
> tableSchema is the schema coming from catalog. Or you can make
> sure that
> the catalog table produces the legacy type:
>
> TableSchema.field("field", Types.OBJECT_ARRAY(Types.STRING));
>
> In 1.11 we will introduce new sources and formats already working
> entirely with the new type system
> (AvroRowDataDeserializationSchema and
> KafkaDynamicTable).
>
> Hope this helps a bit.
>
> Best,
>
> Dawid
>
> On 04/06/2020 13:43, Ramana Uppala wrote:
> > Hi,
> > Avro schema contains Array type and we created
> TableSchema out of the AvroSchema and created a table in catalog.
> In