Vishal, that answer to your question about IngestionTime is "no".
Ingestion time in this context means the time the data was read by Flink
not the time it was written to Kafka.

To get the effect you're looking for you have to set
TimeCharacteristic.EventTime and follow the instructions here:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010

You still need the code you provided in your original email above and you
also have to do:

FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config =
FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps,
topic, new SimpleStringSchema(),
standardProps);config.setWriteTimestampToKafka(true);




On Wed, Jan 30, 2019 at 2:45 AM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Thank you. This though is a little different.
>
> The producer of the kafka message attaches a time stamp
> https://issues.apache.org/jira/browse/KAFKA-2511.  I do not see how I can
> get to that timestamp through a any stream abstraction over
> FlinkKafkaConsumer  API even though it is available here
> https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
> being used here
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L141
>
> All I want to do is this
>
> * Pull from kafka topic . This topic is been written too with a time stamp
> on each kafka record.
> * Write to hdfs using StreamingSink BUT make buckets that * honor
> ingestion time's  water mark. *
>
> Questions is,
>
> *If  we have TimeCharacteristic as IngestionTime,  does the context's
> watermark  in   getBucketId(KafkaRecord element, Context context)
> in BucketAssigner.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html>
>  reflect the kafka record time stamp in
> https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
> <https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html>
> given this "*automatic timestamp assignment and automatic watermark
> generation." is done if  *TimeCharacteristic is **IngestionTime*  (* here
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html>)*
>
>
> Regards.
>
>
>
>
>
>
>
>
>
>
>
>
> On Tue, Jan 29, 2019 at 8:42 PM Congxian Qiu <qcx978132...@gmail.com>
> wrote:
>
>> Hi Vishal
>>  May this doc[1] be helpful for you.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>> Best,
>> Congxian
>>
>>
>> Vishal Santoshi <vishal.santo...@gmail.com> 于2019年1月30日周三 上午4:36写道:
>>
>>> It seems from
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html
>>> that iTimeCharacteristic.IngestionTime should do the trick.
>>>
>>> Just wanted to confirm that the ingestion time is the event time
>>> provided by the kafka producer.
>>>
>>> On Tue, Jan 29, 2019 at 3:21 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>>  In case where one needs t to use kafka event time ( ingestion time )
>>>> for watermark generation and timestamp extraction is setting
>>>> EventTimeCharactersitic  as EventTime enough ?
>>>>
>>>> Or is this  explicit code required ?
>>>>
>>>> consumer.assignTimestampsAndWatermarks(new 
>>>> AssignerWithPunctuatedWatermarks<KafkaRecord>() {
>>>>     @Nullable
>>>>     @Override
>>>>     public Watermark checkAndGetNextWatermark(KafkaRecord lastElement, 
>>>> long extractedTimestamp) {
>>>>         return new Watermark(extractedTimestamp);
>>>>     }
>>>>
>>>>     @Override
>>>>     public long extractTimestamp(KafkaRecord element, long 
>>>> previousElementTimestamp) {
>>>>         return previousElementTimestamp;
>>>>     }
>>>> });
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

Reply via email to