Vishal, that answer to your question about IngestionTime is "no". Ingestion time in this context means the time the data was read by Flink not the time it was written to Kafka.
To get the effect you're looking for you have to set TimeCharacteristic.EventTime and follow the instructions here: https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 You still need the code you provided in your original email above and you also have to do: FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, new SimpleStringSchema(), standardProps);config.setWriteTimestampToKafka(true); On Wed, Jan 30, 2019 at 2:45 AM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > Thank you. This though is a little different. > > The producer of the kafka message attaches a time stamp > https://issues.apache.org/jira/browse/KAFKA-2511. I do not see how I can > get to that timestamp through a any stream abstraction over > FlinkKafkaConsumer API even though it is available here > https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html > being used here > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141 > > All I want to do is this > > * Pull from kafka topic . This topic is been written too with a time stamp > on each kafka record. > * Write to hdfs using StreamingSink BUT make buckets that * honor > ingestion time's water mark. * > > Questions is, > > *If we have TimeCharacteristic as IngestionTime, does the context's > watermark in getBucketId(KafkaRecord element, Context context) > in BucketAssigner.html > <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html> > reflect the kafka record time stamp in > https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html > <https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html> > given this "*automatic timestamp assignment and automatic watermark > generation." is done if *TimeCharacteristic is **IngestionTime* (* here > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html>)* > > > Regards. > > > > > > > > > > > > > On Tue, Jan 29, 2019 at 8:42 PM Congxian Qiu <qcx978132...@gmail.com> > wrote: > >> Hi Vishal >> May this doc[1] be helpful for you. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission >> Best, >> Congxian >> >> >> Vishal Santoshi <vishal.santo...@gmail.com> 于2019年1月30日周三 上午4:36写道: >> >>> It seems from >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html >>> that iTimeCharacteristic.IngestionTime should do the trick. >>> >>> Just wanted to confirm that the ingestion time is the event time >>> provided by the kafka producer. >>> >>> On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> In case where one needs t to use kafka event time ( ingestion time ) >>>> for watermark generation and timestamp extraction is setting >>>> EventTimeCharactersitic as EventTime enough ? >>>> >>>> Or is this explicit code required ? >>>> >>>> consumer.assignTimestampsAndWatermarks(new >>>> AssignerWithPunctuatedWatermarks<KafkaRecord>() { >>>> @Nullable >>>> @Override >>>> public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, >>>> long extractedTimestamp) { >>>> return new Watermark(extractedTimestamp); >>>> } >>>> >>>> @Override >>>> public long extractTimestamp(KafkaRecord element, long >>>> previousElementTimestamp) { >>>> return previousElementTimestamp; >>>> } >>>> }); >>>> >>>> >>>> >>>> >>>> >>>>