Hi Fabian:

        I think it would be better without such a limitation.I want to consult 
another problem. When I use BucketingSink(I use aws s3), the filename of a few 
files after checkpoint still hasn't changed, resulting in the underline prefix 
of the final generation of a small number of files. After analysis, it is found 
that it is due to the eventually consistent  of S3.Are there any better 
solutions available?thanks

Best
Ben

        
https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22
 
<https://issues.apache.org/jira/browse/FLINK-8794?jql=text%20~%20%22BucketingSink%22>
 

> On Apr 10, 2018, at 6:29 PM, Ben Yan <yan.xiao.bin.m...@gmail.com> wrote:
> 
> Hi Fabian.
> 
>       If I use ProcessFunction , I can get it! But I want to know  that how 
> to get Kafka timestamp in like flatmap and map methods of datastream using 
> scala programming language.
> Thanks!
> 
> Best
> Ben
> 
>> On Apr 4, 2018, at 7:00 PM, Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> 
>> Hi Navneeth,
>> 
>> Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if 
>> you configure EventTime for an application [1].
>> Since Flink treats record timestamps as meta data, they are not directly 
>> accessible by most functions. You can implement a ProcessFunction [2] to 
>> access the timestamp of a record via the ProcessFunction's Context object.
>> 
>> Best, Fabian
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction>
>> 
>> 2018-03-30 7:45 GMT+02:00 Ben Yan <yan.xiao.bin.m...@gmail.com 
>> <mailto:yan.xiao.bin.m...@gmail.com>>:
>> hi,
>> Is that what you mean?
>> See : 
>> https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145
>>  
>> <https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145>
>>  
>> 
>> Best
>> Ben
>> 
>>> On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan <reachnavnee...@gmail.com 
>>> <mailto:reachnavnee...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> Is there way to get the kafka timestamp in deserialization schema? All 
>>> records are written to kafka with timestamp and I would like to set that 
>>> timestamp to every record that is ingested. Thanks.
>> 
>> 
> 

Reply via email to