
Thanks for the PR! I'll have a look at it later today.

The problem of the retraction stream conversion is probably that the return
type is a Tuple2[Boolean, Row].
The boolean flag indicates whether the row is added or retracted.

> Thanks Fabian, filed FLINK-9742 [1].
> I'll submit a PR for FLINK-8094 via providing my TimestampExtractor. The
> implementation is also described as FLINK-9742. I'll start with current
> implementation which just leverages automatic cast from STRING to
> SQL_TIMESTAMP, but we could improve it from PR. Feedbacks are welcome!
> Btw, maybe need to initiate from another thread, but I also had to
> struggle to find a solution to convert table to retract stream. Looks like
> "implicit conversion" comes into play prior to toRetractStream and raise
> error. outTable is the result of "distinct" which looks like requiring
> retract mode. (Not even easy for me to know I should provide implicit
> TypeInformation for Row, but I'm fairly new to Scala so it's just me.)
> // below doesn't work as below line implicitly converts table as 'append 
> stream'
> // via org.apache.flink.table.api.scala.package$.table2RowDataStream
> // though we are calling toRetractStream
> //outTable.toRetractStream[Row](outTable.dataType).print()
> implicit val typeInfo = Types.ROW(outTable.getSchema.getColumnNames,
>   outTable.getSchema.getTypes)
> tableEnv.toRetractStream[Row](outTable).print()
> [1] https://issues.apache.org/jira/browse/FLINK-9742
>> 1) Yes, I think we should make resultType() public. Please open a Jira
>> issue and describe your use case.
>> Btw. would you like to contribute your TimestampExtractor to Flink (or
>> even a more generic one that allows to configure the format of the
>> timestamp string)? There is FLINK-8094 [1].
>> 2) This is "expected" because you define two different schemas, the JSON
>> schema which defines how to read the data and the Table schema that defines
>> how it is exposed to the Table API / SQL.
>> [1] https://issues.apache.org/jira/browse/FLINK-8094
>>> 1. I had to apply a hack (create new TimestampExtractor class to package
>>> org.apache.flink.blabla...) since Expression.resultType is defined as
>>> "package private" for flink. I feel adjusting scope of Explain's methods
>>> (at least resultType) to "public" would help on implementing custom
>>> TimestampExtractor in users' side: please let me know your thought about
>>> this. If you think it makes sense, I will file an issue and submit a PR, or
>>> initiate a new thread in dev mailing list to discuss it if the step is
>>> recommend.
>>> 2. To ensure KafkaTableSource's verification of rowtime field type, the
>>> type of field (here in "eventTime") should be defined as SQL_TIMESTAMP
>>> whereas the type of field in JSON should be defined as STRING.
>>> Kafka010JsonTableSource.builder()
>>>   .forTopic(topic)
>>>   .withSchema(TableSchema.builder()
>>>     .field("eventTime", Types.SQL_TIMESTAMP)
>>>     .build())
>>>   .forJsonSchema(TableSchema.builder()
>>>     .field("eventTime", Types.STRING)
>>>     .build())
>>>   .withKafkaProperties(prop)
>>>   .withRowtimeAttribute(
>>>     "eventTime",
>>>     new IsoDateStringAwareExistingField("eventTime"),
>>>     new BoundedOutOfOrderTimestamps(Time.minutes(1).toMilliseconds)
>>>   )
>>>   .build()
>>>> If it is "only" about the missing support to parse a string as
>>>> timestamp, you could also implement a custom TimestampExtractor that works
>>>> similar to the ExistingField extractor [1].
>>>> You would need to adjust a few things and use the expression
>>>> "Cast(Cast('tsString, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)" to convert
>>>> the String to a Long.
>>>> So far this works only if the date is formatted like "2018-05-28
>>>> 12:34:56.000"
>>>> Regarding the side outputs, these would not be handled as results but
>>>> just redirect late records into separate data streams. We would offer a
>>>> configuration to write them to a sink like HDFS or Kafka.
>>>> [1] https://github.com/apache/flink/blob/master/flink-
>>>> libraries/flink-table/src/main/scala/org/apache/flink/
>>>> table/sources/tsextractors/ExistingField.scala
>>>>> I guess I already tried out with KafkaJsonTableSource and failed back
>>>>> to custom TableSource since the type of rowtime field is string
>>>>> unfortunately, and I needed to parse and map to new SQL timestamp field in
>>>>> order to use it to rowtime attribute.
>>>>> I guess JSON -> table fields mapping is provided only for renaming,
>>>>> and "withRowtimeAttribute" doesn't help defining new field to use it as
>>>>> rowtime.
>>>>> Are there better approaches on this scenario? Or would we be better to
>>>>> assume the type of rowtime field is always timestamp?
>>>>> Btw, providing late-data side output in Table API might be just a
>>>>> matter of how to define it correctly (not a technical or syntactic issue),
>>>>> though providing in SQL might be tricky (as the semantic of SQL query is
>>>>> not for multiple outputs).
>>>>>> Flink 1.5.0 features a TableSource for Kafka and JSON [1], incl.
>>>>>> timestamp & watemark generation [2].
>>>>>> It would be great if you could let us know, if that addresses your
>>>>>> use case and if not what's missing or not working.
>>>>>> So far Table API / SQL does not have support for late-data side
>>>>>> outputs. However, that's on the road map. The idea is to filter streams
>>>>>> during ingestion for late events and passing them to a side output.
>>>>>> Currently, operators just drop late events.
>>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-
>>>>>> release-1.5/dev/table/sourceSinks.html#kafkajsontablesource
>>>>>> [2] https://ci.apache.org/projects/flink/flink-docs-
>>>>>> release-1.5/dev/table/sourceSinks.html#configuring-
>>>>>> a-rowtime-attribute
>>>>>>> The watermark display in the UI is bugged in 1.5.0.
>>>>>>> It is fixed on master and the release-1.5 branch, and will be
>>>>>>> included in 1.5.1 that is slated to be released next week.
>>>>>>> Sorry I forgot to mention the version: Flink 1.5.0, and I ran the
>>>>>>> app in IntelliJ, not tried from cluster.
>>>>>>>> Hi Flink users,
>>>>>>>> I'm new to Flink and trying to evaluate couple of streaming
>>>>>>>> frameworks via implementing same apps.
>>>>>>>> While implementing apps with both Table API and SQL, I found
>>>>>>>> there's 'no watermark' presented in Flink UI, whereas I had been 
>>>>>>>> struggling
>>>>>>>> to apply row time attribute.
>>>>>>>> For example, below is one of TableSource implementation which wraps
>>>>>>>> DataStream reading from Kafka.
>>>>>>>> https://github.com/HeartSaVioR/iot-trucking-app-
>>>>>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/
>>>>>>>> datasource/TruckSpeedSource.scala
>>>>>>>> (Actually I ended up implementing TableSource to address adding
>>>>>>>> rowtime attribute as well as reading and parsing JSON. I'd be really 
>>>>>>>> happy
>>>>>>>> if someone can guide a way to get rid of needed of custom 
>>>>>>>> implementation of
>>>>>>>> TableSource.)
>>>>>>>> and below is one of app I implemented:
>>>>>>>> https://github.com/HeartSaVioR/iot-trucking-app-
>>>>>>>> flink/blob/master/src/main/scala/net/heartsavior/flink/app/sql/
>>>>>>>> IotTruckingAppMovingAggregationsOnSpeedSql.scala
>>>>>>>> Btw, I'm about to experiment side-output with late events, but is
>>>>>>>> it possible to leverage side-output with Table API / SQL? Looks like
>>>>>>>> DataStream exposes late events only when it's converted to
>>>>>>>> AllWindowedStream.
>>>>>>>> Thanks in advance!
