Hi Timo,
I will look at the Catalog interface and see what it requires.

For the meanwhile I'll go back to the old API. Do you expect FLIP-136 to
resolve the issue around legacy types? Will it's implementation allow to
register LEGACY types? or a new variation of them?

On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <[email protected]> wrote:

> I would recommend to use the old UDF stack for now. You can simply call
> `StreamTableEnvironment.registerFunction` instead of
> `createTemporarySystemFunction`. Then the UDF returns a legacy type that
> the DataStream API understands.
>
> Have you thought about implementing your own catalog instead of
> generating CREATE TABLE statements? The catalog API seems a bit complex
> at first glance but only requires to implement a couple of methods. In
> this case you can implement your own `CatalogTable` which is the parsed
> representation of a `CREATE TABLE` statement. In this case you would
> have the full control over the entire type stack end-to-end.
>
> Regards,
> Timo
>
> On 28.12.20 10:36, Yuval Itzchakov wrote:
> > Timo, an additional question.
> >
> > I am currently using TypeConversions.fromLegacyInfoToDataType. However,
> > this still does not allow me to register this table with the catalog
> > since any representation of LEGACY isn't supported (I don't see it
> > generating any other DataType other than RAW(.., LEGACY). Is there any
> > way I can preserve the underlying type and still register the column
> > somehow with CREATE TABLE?
> >
> > On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> >     Hi Timo,
> >
> >     Thanks for the explanation. Passing around a byte array is not
> >     possible since I need to know the concrete type later for
> >     serialization in my sink, so I need to keep the type.
> >
> >     What I'm trying to achieve is the following:
> >     I have a scalar UDF function:
> >
> >     image.png
> >
> >     This function is later used in processing of a Flink table, by
> >     calling PARSE_JSON on the selected field.
> >     Whoever uses these UDFs isn't aware of any Flink syntax for creating
> >     and registering tables, I generate the CREATE TABLE statement behind
> >     the scenes, dynamically.
> >     for each table. In order for this to work, I need the UDF to output
> >     the correct type (in this example, io.circe.Json). Later, this JSON
> >     type (or any other type returned by the UDF for that matter) will
> >     get serialized in a custom sink and into a data warehouse (that's
> >     why I need to keep the concrete type, since serialization happens at
> >     the edges before writing it out).
> >
> >     The reason I'm converting between Table and DataStream is that this
> >     table will be further manipulated before being written to the custom
> >     DynamicTableSink by applying transformations on a DataStream[Row],
> >     and then converted back to a Table to be used in an additional
> >     CREATE TABLE and then INSERT INTO statement.
> >
> >     This is why I have these conversions back and forth, and why I
> >     somehow need a way to register the legacy types as a valid type of
> >     the table.
> >     Hope that clarifies a bit, since the pipeline is rather complex I
> >     can't really share a MVCE of it.
> >
> >     On Mon, Dec 28, 2020 at 11:08 AM Timo Walther <[email protected]
> >     <mailto:[email protected]>> wrote:
> >
> >         Hi Yuval,
> >
> >         the legacy type has no string representation that can be used in
> >         a SQL
> >         DDL statement. The current string representation LEGACY(...) is
> >         only a
> >         temporary work around to persist the old types in catalogs.
> >
> >         Until FLIP-136 is fully implemented,
> toAppendStream/toRetractStream
> >         support only legacy type info. So I would recommend to use the
> >         legacy
> >         type in the UDF return type as well. Either you use the old
> >         `getResultType` method or you override `getTypeInference` and
> call
> >         `TypeConversions.fromLegacyInfoToDataType`.
> >
> >         Another work around could be that you simply use `BYTES` as the
> >         return
> >         type and pass around a byte array instead.
> >
> >         Maybe you can show us a little end-to-end example, what you are
> >         trying
> >         to achieve?
> >
> >         Regards,
> >         Timo
> >
> >
> >         On 28.12.20 07:47, Yuval Itzchakov wrote:
> >          > Hi Danny,
> >          >
> >          > Yes, I tried implementing the DataTypeFactory for the UDF
> using
> >          > TypeInformationRawType (which is deprecated BTW, and there's
> >         no support
> >          > for RawType in the conversion), didn't help.
> >          >
> >          > I did manage to get the conversion working using
> >          > TableEnvironment.toAppendStream (I was previously directly
> >         calling
> >          > TypeConversions) but still remains the problem that Flink
> >         can't register
> >          > LEGACY types via the CREATE TABLE DDL
> >          >
> >          > On Mon, Dec 28, 2020, 04:25 Danny Chan <[email protected]
> >         <mailto:[email protected]>
> >          > <mailto:[email protected] <mailto:[email protected]>>>
> >         wrote:
> >          >
> >          >      > SQL parse failed. Encount
> >          >     What syntax did you use ?
> >          >
> >          >      > TypeConversions.fromDataTypeToLegacyInfo cannot
> >         convert a plain
> >          >     RAW type back to TypeInformation.
> >          >
> >          >     Did you try to construct type information by a new
> >          >     fresh TypeInformationRawType ?
> >          >
> >          >     Yuval Itzchakov <[email protected]
> >         <mailto:[email protected]> <mailto:[email protected]
> >         <mailto:[email protected]>>> 于
> >          >     2020年12月24日周四 下午7:24写道:
> >          >
> >          >         An expansion to my question:
> >          >
> >          >         What I really want is for the UDF to return
> >         `RAW(io.circe.Json,
> >          >         ?)` type, but I have to do a conversion between Table
> and
> >          >         DataStream, and
> >         TypeConversions.fromDataTypeToLegacyInfo cannot
> >          >         convert a plain RAW type back to TypeInformation.
> >          >
> >          >         On Thu, Dec 24, 2020 at 12:59 PM Yuval Itzchakov
> >          >         <[email protected] <mailto:[email protected]>
> >         <mailto:[email protected] <mailto:[email protected]>>> wrote:
> >          >
> >          >             Hi,
> >          >
> >          >             I have a UDF which returns a type of MAP<STRING,
> >          >             LEGACY('RAW', 'ANY<io.circe.Json>')>. When I try
> >         to register
> >          >             this type with Flink via the CREATE TABLE DDL, I
> >         encounter
> >          >             an exception:
> >          >
> >          >             - SQL parse failed. Encountered "(" at line 2,
> >         column 256.
> >          >             Was expecting one of:
> >          >                  "NOT" ...
> >          >                  "NULL" ...
> >          >                  ">" ...
> >          >                  "MULTISET" ...
> >          >                  "ARRAY" ...
> >          >                  "." ...
> >          >
> >          >             Which looks like the planner doesn't like the
> >         round brackets
> >          >             on the LEGACY type. What is the correct way to
> >         register the
> >          >             table with this type with Flink?
> >          >             --
> >          >             Best Regards,
> >          >             Yuval Itzchakov.
> >          >
> >          >
> >          >
> >          >         --
> >          >         Best Regards,
> >          >         Yuval Itzchakov.
> >          >
> >
> >
> >
> >     --
> >     Best Regards,
> >     Yuval Itzchakov.
> >
> >
> >
> > --
> > Best Regards,
> > Yuval Itzchakov.
>
>

-- 
Best Regards,
Yuval Itzchakov.

Reply via email to