Hi Raja,

I think you can in fact do this by implementing a custom Bucketer. You can have 
a look at BasePathBucketer and extend that to include the timestamp in the path 
that is returned. You should probably clamp the timestamp so that you don't get 
a new path for every millisecond.

Best,
Aljoscha

> On 1. Sep 2017, at 08:18, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> Hi,
> 
> BucketingSink doesn’t support the feature that you are requesting, you can 
> not specify a dynamically generated prefix/suffix.
> 
> Piotrek
> 
>> On Aug 31, 2017, at 7:12 PM, Raja.Aravapalli <raja.aravapa...@target.com 
>> <mailto:raja.aravapa...@target.com>> wrote:
>> 
>>  
>> Hi,
>>  
>> I have a flink application that is streaming data into HDFS and I am using 
>> Bucketing Sink for that. And, I want to know if is it possible to rename the 
>> part files that is being created in the base hdfs directory.
>>  
>> Right now I am using the below code for including the timestamp into 
>> part-file name, but the problem I am facing is the timestamp is not changing 
>> for the new part file that is being rolled!
>>  
>>  
>> BucketingSink<String> HdfsSink = new BucketingSink<String> (hdfsOutputPath);
>> 
>> HdfsSink.setBucketer(new BasePathBucketer<String>());
>> HdfsSink.setBatchSize(1024 * 1024 * hdfsOutputBatchSizeInMB); // this means 
>> 'hdfsOutputBatchSizeInMB' MB
>> HdfsSink.setPartPrefix("PART-FILE-" + 
>> Long.toString(System.currentTimeMillis()));
>>  
>>  
>> Can someone please suggest me, what code changes I can try so that I get a 
>> new timestamp for every part file that is being rolled new?
>>  
>>  
>> Thanks a lot. 
>>  
>> Regards,
>> Raja.
> 

Reply via email to