I see, alright. Thank you for all the help!
On Mon, Dec 28, 2020 at 3:57 PM Timo Walther <[email protected]> wrote: > Hi Yuval, > > FLIP-136 will resolve the remaining legacy type issues for DataStream > API. After that we can drop legacy types because everything can be > expressed with DataType. DataType is a super type of TypeInformation. > > Registering types (e.g. giving a RAW type backed by io.circe.Json a name > such that it can be used in a DDL statement) is also in the backlog but > requires another FLIP. > > Regards, > Timo > > > On 28.12.20 13:04, Yuval Itzchakov wrote: > > Hi Timo, > > I will look at the Catalog interface and see what it requires. > > > > For the meanwhile I'll go back to the old API. Do you expect FLIP-136 to > > resolve the issue around legacy types? Will it's implementation allow to > > register LEGACY types? or a new variation of them? > > > > On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <[email protected] > > <mailto:[email protected]>> wrote: > > > > I would recommend to use the old UDF stack for now. You can simply > call > > `StreamTableEnvironment.registerFunction` instead of > > `createTemporarySystemFunction`. Then the UDF returns a legacy type > > that > > the DataStream API understands. > > > > Have you thought about implementing your own catalog instead of > > generating CREATE TABLE statements? The catalog API seems a bit > complex > > at first glance but only requires to implement a couple of methods. > In > > this case you can implement your own `CatalogTable` which is the > parsed > > representation of a `CREATE TABLE` statement. In this case you would > > have the full control over the entire type stack end-to-end. > > > > Regards, > > Timo > > > > On 28.12.20 10:36, Yuval Itzchakov wrote: > > > Timo, an additional question. > > > > > > I am currently using TypeConversions.fromLegacyInfoToDataType. > > However, > > > this still does not allow me to register this table with the > catalog > > > since any representation of LEGACY isn't supported (I don't see it > > > generating any other DataType other than RAW(.., LEGACY). Is > > there any > > > way I can preserve the underlying type and still register the > column > > > somehow with CREATE TABLE? > > > > > > On Mon, Dec 28, 2020 at 11:22 AM Yuval Itzchakov > > <[email protected] <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > > > Hi Timo, > > > > > > Thanks for the explanation. Passing around a byte array is not > > > possible since I need to know the concrete type later for > > > serialization in my sink, so I need to keep the type. > > > > > > What I'm trying to achieve is the following: > > > I have a scalar UDF function: > > > > > > image.png > > > > > > This function is later used in processing of a Flink table, by > > > calling PARSE_JSON on the selected field. > > > Whoever uses these UDFs isn't aware of any Flink syntax for > > creating > > > and registering tables, I generate the CREATE TABLE statement > > behind > > > the scenes, dynamically. > > > for each table. In order for this to work, I need the UDF to > > output > > > the correct type (in this example, io.circe.Json). Later, > > this JSON > > > type (or any other type returned by the UDF for that matter) > will > > > get serialized in a custom sink and into a data warehouse > (that's > > > why I need to keep the concrete type, since serialization > > happens at > > > the edges before writing it out). > > > > > > The reason I'm converting between Table and DataStream is > > that this > > > table will be further manipulated before being written to the > > custom > > > DynamicTableSink by applying transformations on a > > DataStream[Row], > > > and then converted back to a Table to be used in an additional > > > CREATE TABLE and then INSERT INTO statement. > > > > > > This is why I have these conversions back and forth, and why I > > > somehow need a way to register the legacy types as a valid > > type of > > > the table. > > > Hope that clarifies a bit, since the pipeline is rather > complex I > > > can't really share a MVCE of it. > > > > > > On Mon, Dec 28, 2020 at 11:08 AM Timo Walther > > <[email protected] <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>>> > wrote: > > > > > > Hi Yuval, > > > > > > the legacy type has no string representation that can be > > used in > > > a SQL > > > DDL statement. The current string representation > > LEGACY(...) is > > > only a > > > temporary work around to persist the old types in > catalogs. > > > > > > Until FLIP-136 is fully implemented, > > toAppendStream/toRetractStream > > > support only legacy type info. So I would recommend to > > use the > > > legacy > > > type in the UDF return type as well. Either you use the > old > > > `getResultType` method or you override `getTypeInference` > > and call > > > `TypeConversions.fromLegacyInfoToDataType`. > > > > > > Another work around could be that you simply use `BYTES` > > as the > > > return > > > type and pass around a byte array instead. > > > > > > Maybe you can show us a little end-to-end example, what > > you are > > > trying > > > to achieve? > > > > > > Regards, > > > Timo > > > > > > > > > On 28.12.20 07:47, Yuval Itzchakov wrote: > > > > Hi Danny, > > > > > > > > Yes, I tried implementing the DataTypeFactory for the > > UDF using > > > > TypeInformationRawType (which is deprecated BTW, and > > there's > > > no support > > > > for RawType in the conversion), didn't help. > > > > > > > > I did manage to get the conversion working using > > > > TableEnvironment.toAppendStream (I was previously > directly > > > calling > > > > TypeConversions) but still remains the problem that > Flink > > > can't register > > > > LEGACY types via the CREATE TABLE DDL > > > > > > > > On Mon, Dec 28, 2020, 04:25 Danny Chan > > <[email protected] <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected] > >> > > > > <mailto:[email protected] > > <mailto:[email protected]> <mailto:[email protected] > > <mailto:[email protected]>>>> > > > wrote: > > > > > > > > > SQL parse failed. Encount > > > > What syntax did you use ? > > > > > > > > > TypeConversions.fromDataTypeToLegacyInfo cannot > > > convert a plain > > > > RAW type back to TypeInformation. > > > > > > > > Did you try to construct type information by a new > > > > fresh TypeInformationRawType ? > > > > > > > > Yuval Itzchakov <[email protected] > > <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>> > > <mailto:[email protected] <mailto:[email protected]> > > > <mailto:[email protected] <mailto:[email protected]>>>> 于 > > > > 2020年12月24日周四 下午7:24写道: > > > > > > > > An expansion to my question: > > > > > > > > What I really want is for the UDF to return > > > `RAW(io.circe.Json, > > > > ?)` type, but I have to do a conversion > > between Table and > > > > DataStream, and > > > TypeConversions.fromDataTypeToLegacyInfo cannot > > > > convert a plain RAW type back to > TypeInformation. > > > > > > > > On Thu, Dec 24, 2020 at 12:59 PM Yuval > Itzchakov > > > > <[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>> > > > <mailto:[email protected] <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>>> wrote: > > > > > > > > Hi, > > > > > > > > I have a UDF which returns a type > > of MAP<STRING, > > > > LEGACY('RAW', 'ANY<io.circe.Json>')>. When > > I try > > > to register > > > > this type with Flink via the CREATE TABLE > > DDL, I > > > encounter > > > > an exception: > > > > > > > > - SQL parse failed. Encountered "(" at > line 2, > > > column 256. > > > > Was expecting one of: > > > > "NOT" ... > > > > "NULL" ... > > > > ">" ... > > > > "MULTISET" ... > > > > "ARRAY" ... > > > > "." ... > > > > > > > > Which looks like the planner doesn't like > the > > > round brackets > > > > on the LEGACY type. What is the correct > way to > > > register the > > > > table with this type with Flink? > > > > -- > > > > Best Regards, > > > > Yuval Itzchakov. > > > > > > > > > > > > > > > > -- > > > > Best Regards, > > > > Yuval Itzchakov. > > > > > > > > > > > > > > > > -- > > > Best Regards, > > > Yuval Itzchakov. > > > > > > > > > > > > -- > > > Best Regards, > > > Yuval Itzchakov. > > > > > > > > -- > > Best Regards, > > Yuval Itzchakov. > > -- Best Regards, Yuval Itzchakov.
