Hi, Thanks for the PR! I'll have a look at it later today.
The problem of the retraction stream conversion is probably that the return type is a Tuple2[Boolean, Row]. The boolean flag indicates whether the row is added or retracted. Best, Fabian 2018-07-04 15:38 GMT+02:00 Jungtaek Lim <kabh...@gmail.com>: > Thanks Fabian, filed FLINK-9742 [1]. > > I'll submit a PR for FLINK-8094 via providing my TimestampExtractor. The > implementation is also described as FLINK-9742. I'll start with current > implementation which just leverages automatic cast from STRING to > SQL_TIMESTAMP, but we could improve it from PR. Feedbacks are welcome! > > Btw, maybe need to initiate from another thread, but I also had to > struggle to find a solution to convert table to retract stream. Looks like > "implicit conversion" comes into play prior to toRetractStream and raise > error. outTable is the result of "distinct" which looks like requiring > retract mode. (Not even easy for me to know I should provide implicit > TypeInformation for Row, but I'm fairly new to Scala so it's just me.) > > // below doesn't work as below line implicitly converts table as 'append > stream' > // via org.apache.flink.table.api.scala.package$.table2RowDataStream > // though we are calling toRetractStream > //outTable.toRetractStream[Row](outTable.dataType).print() > > implicit val typeInfo = Types.ROW(outTable.getSchema.getColumnNames, > outTable.getSchema.getTypes) > tableEnv.toRetractStream[Row](outTable).print() > > > Thanks again, > Jungtaek Lim (HeartSaVioR) > > [1] https://issues.apache.org/jira/browse/FLINK-9742 > > 2018년 7월 4일 (수) 오후 10:03, Fabian Hueske <fhue...@gmail.com>님이 작성: > >> Hi, >> >> Glad you could get it to work! That's great :-) >> >> Regarding you comments: >> >> 1) Yes, I think we should make resultType() public. Please open a Jira >> issue and describe your use case. >> Btw. would you like to contribute your TimestampExtractor to Flink (or >> even a more generic one that allows to configure the format of the >> timestamp string)? There is FLINK-8094 [1]. >> 2) This is "expected" because you define two different schemas, the JSON >> schema which defines how to read the data and the Table schema that defines >> how it is exposed to the Table API / SQL. >> >> Thanks, Fabian >> >> [1] https://issues.apache.org/jira/browse/FLINK-8094 >> >> 2018-07-04 14:52 GMT+02:00 Jungtaek Lim <kabh...@gmail.com>: >> >>> Thanks again Fabian for providing nice suggestion! >>> >>> Finally I got it working with applying your suggestion. Couple of tricks >>> was needed: >>> >>> 1. I had to apply a hack (create new TimestampExtractor class to package >>> org.apache.flink.blabla...) since Expression.resultType is defined as >>> "package private" for flink. I feel adjusting scope of Explain's methods >>> (at least resultType) to "public" would help on implementing custom >>> TimestampExtractor in users' side: please let me know your thought about >>> this. If you think it makes sense, I will file an issue and submit a PR, or >>> initiate a new thread in dev mailing list to discuss it if the step is >>> recommend. >>> >>> 2. To ensure KafkaTableSource's verification of rowtime field type, the >>> type of field (here in "eventTime") should be defined as SQL_TIMESTAMP >>> whereas the type of field in JSON should be defined as STRING. >>> >>> Kafka010JsonTableSource.builder() >>> .forTopic(topic) >>> .withSchema(TableSchema.builder() >>> .field("eventTime", Types.SQL_TIMESTAMP) >>> .build()) >>> .forJsonSchema(TableSchema.builder() >>> .field("eventTime", Types.STRING) >>> .build()) >>> .withKafkaProperties(prop) >>> .withRowtimeAttribute( >>> "eventTime", >>> new IsoDateStringAwareExistingField("eventTime"), >>> new BoundedOutOfOrderTimestamps(Time.minutes(1).toMilliseconds) >>> ) >>> .build() >>> >>> Thanks again! >>> Jungtaek Lim (HeartSaVioR) >>> >>> 2018년 7월 4일 (수) 오후 8:18, Fabian Hueske <fhue...@gmail.com>님이 작성: >>> >>>> Hi Jungtaek, >>>> >>>> If it is "only" about the missing support to parse a string as >>>> timestamp, you could also implement a custom TimestampExtractor that works >>>> similar to the ExistingField extractor [1]. >>>> You would need to adjust a few things and use the expression >>>> "Cast(Cast('tsString, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)" to convert >>>> the String to a Long. >>>> So far this works only if the date is formatted like "2018-05-28 >>>> 12:34:56.000" >>>> >>>> Regarding the side outputs, these would not be handled as results but >>>> just redirect late records into separate data streams. We would offer a >>>> configuration to write them to a sink like HDFS or Kafka. >>>> >>>> Best, Fabian >>>> >>>> [1] https://github.com/apache/flink/blob/master/flink- >>>> libraries/flink-table/src/main/scala/org/apache/flink/ >>>> table/sources/tsextractors/ExistingField.scala >>>> >>>> 2018-07-04 11:54 GMT+02:00 Jungtaek Lim <kabh...@gmail.com>: >>>> >>>>> Thanks Chesnay! Great news to hear. I'll try out with latest master >>>>> branch. >>>>> >>>>> Thanks Fabian for providing the docs! >>>>> >>>>> I guess I already tried out with KafkaJsonTableSource and failed back >>>>> to custom TableSource since the type of rowtime field is string >>>>> unfortunately, and I needed to parse and map to new SQL timestamp field in >>>>> order to use it to rowtime attribute. >>>>> >>>>> I guess JSON -> table fields mapping is provided only for renaming, >>>>> and "withRowtimeAttribute" doesn't help defining new field to use it as >>>>> rowtime. >>>>> >>>>> Are there better approaches on this scenario? Or would we be better to >>>>> assume the type of rowtime field is always timestamp? >>>>> >>>>> Btw, providing late-data side output in Table API might be just a >>>>> matter of how to define it correctly (not a technical or syntactic issue), >>>>> though providing in SQL might be tricky (as the semantic of SQL query is >>>>> not for multiple outputs). >>>>> >>>>> Thanks, >>>>> Jungtaek Lim (HeartSaVioR) >>>>> >>>>> 2018년 7월 4일 (수) 오후 5:49, Fabian Hueske <fhue...@gmail.com>님이 작성: >>>>> >>>>>> Hi Jungtaek, >>>>>> >>>>>> Flink 1.5.0 features a TableSource for Kafka and JSON [1], incl. >>>>>> timestamp & watemark generation [2]. >>>>>> It would be great if you could let us know, if that addresses your >>>>>> use case and if not what's missing or not working. >>>>>> >>>>>> So far Table API / SQL does not have support for late-data side >>>>>> outputs. However, that's on the road map. The idea is to filter streams >>>>>> during ingestion for late events and passing them to a side output. >>>>>> Currently, operators just drop late events. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> [1] https://ci.apache.org/projects/flink/flink-docs- >>>>>> release-1.5/dev/table/sourceSinks.html#kafkajsontablesource >>>>>> [2] https://ci.apache.org/projects/flink/flink-docs- >>>>>> release-1.5/dev/table/sourceSinks.html#configuring- >>>>>> a-rowtime-attribute >>>>>> >>>>>> 2018-07-04 10:39 GMT+02:00 Chesnay Schepler <ches...@apache.org>: >>>>>> >>>>>>> The watermark display in the UI is bugged in 1.5.0. >>>>>>> >>>>>>> It is fixed on master and the release-1.5 branch, and will be >>>>>>> included in 1.5.1 that is slated to be released next week. >>>>>>> >>>>>>> >>>>>>> On 04.07.2018 10:22, Jungtaek Lim wrote: >>>>>>> >>>>>>> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the >>>>>>> app in IntelliJ, not tried from cluster. >>>>>>> >>>>>>> 2018년 7월 4일 (수) 오후 5:15, Jungtaek Lim <kabh...@gmail.com>님이 작성: >>>>>>> >>>>>>>> Hi Flink users, >>>>>>>> >>>>>>>> I'm new to Flink and trying to evaluate couple of streaming >>>>>>>> frameworks via implementing same apps. >>>>>>>> >>>>>>>> While implementing apps with both Table API and SQL, I found >>>>>>>> there's 'no watermark' presented in Flink UI, whereas I had been >>>>>>>> struggling >>>>>>>> to apply row time attribute. >>>>>>>> >>>>>>>> For example, below is one of TableSource implementation which wraps >>>>>>>> DataStream reading from Kafka. >>>>>>>> >>>>>>>> https://github.com/HeartSaVioR/iot-trucking-app- >>>>>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/ >>>>>>>> datasource/TruckSpeedSource.scala >>>>>>>> >>>>>>>> (Actually I ended up implementing TableSource to address adding >>>>>>>> rowtime attribute as well as reading and parsing JSON. I'd be really >>>>>>>> happy >>>>>>>> if someone can guide a way to get rid of needed of custom >>>>>>>> implementation of >>>>>>>> TableSource.) >>>>>>>> >>>>>>>> and below is one of app I implemented: >>>>>>>> >>>>>>>> https://github.com/HeartSaVioR/iot-trucking-app- >>>>>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/ >>>>>>>> IotTruckingAppMovingAggregationsOnSpeedSql.scala >>>>>>>> >>>>>>>> Btw, I'm about to experiment side-output with late events, but is >>>>>>>> it possible to leverage side-output with Table API / SQL? Looks like >>>>>>>> DataStream exposes late events only when it's converted to >>>>>>>> AllWindowedStream. >>>>>>>> >>>>>>>> Thanks in advance! >>>>>>>> >>>>>>>> Best Regards, >>>>>>>> Jungtaek Lim (HeartSaVioR) >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>> >>