Hi, This is a know issue: https://issues.apache.org/jira/browse/FLINK-8500 <https://issues.apache.org/jira/browse/FLINK-8500>. And yes, the workaround is to write an assigner from scratch but you can start by copying the code of AscendingTimestampExtractor.
Sorry for the inconvenience. -- Aljoscha > On 22. Feb 2018, at 12:05, Federico D'Ambrosio <fedex...@gmail.com> wrote: > > Hello everyone, > > I'm consuming from a Kafka topic, on which I'm writing with a > FlinkKafkaProducer, with the timestamp relative flag set to true. > > From what I gather from the documentation [1], Flink is aware of Kafka > Record's timestamp and only the watermark should be set with an appropriate > TimestampExtractor, still I'm failing to understand how to implement it in > the right way. > > I thought that it would be possible to use the already existent > AscendingTimestampExtractor, overriding the extractTimestamp method, but it's > marked final. > new FlinkKafkaConsumer010[Event](ingestion_topic, new > JSONDeserializationSchema(), consumerConfig) > .setStartFromLatest() > .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[Event]() { > def extractAscendingTimestamp(element: Event): Long = ??? > }) > Should I need to implement my own TimestampExtractor (with the appropriate > getCurrentWatermark and extractTimestamp methods) ? > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 > > <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010> > > Thank you, > Federico >