Hi,

This is a know issue: https://issues.apache.org/jira/browse/FLINK-8500 
<https://issues.apache.org/jira/browse/FLINK-8500>. And yes, the workaround is 
to write an assigner from scratch but you can start by copying the code of 
AscendingTimestampExtractor.

Sorry for the inconvenience.

--
Aljoscha

> On 22. Feb 2018, at 12:05, Federico D'Ambrosio <fedex...@gmail.com> wrote:
> 
> Hello everyone,
> 
> I'm consuming from a Kafka topic, on which I'm writing with a 
> FlinkKafkaProducer, with the timestamp relative flag set to true.
> 
> From what I gather from the documentation [1], Flink is aware of Kafka 
> Record's timestamp and only the watermark should be set with an appropriate 
> TimestampExtractor, still I'm failing to understand how to implement it in 
> the right way.
> 
> I thought that it would be possible to use the already existent 
> AscendingTimestampExtractor, overriding the extractTimestamp method, but it's 
> marked final. 
> new FlinkKafkaConsumer010[Event](ingestion_topic, new 
> JSONDeserializationSchema(), consumerConfig)
>   .setStartFromLatest()
>     .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() {
>       def extractAscendingTimestamp(element: Event): Long = ???
>     })
> Should I need to implement my own TimestampExtractor (with the appropriate 
> getCurrentWatermark and extractTimestamp methods) ? 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>   
> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>
> 
> Thank you,
> Federico
> 

Reply via email to