Hi, Rinat

I tried this situation you said and it works fine for me. The partCounter 
incremented as we hope. When the new part file is created, I did not see any 
same part index. Here is my code for that, you can take a look.
In my case, the max index of part file is part-0-683PartSuffix, other than 
that, all still keep in _part-0-684PartSuffix.pending,  
_part-0-685PartSuffix.pending and so on since checkpoint does not finished.

Cheers
Minglei.

public class TestSuffix {
   public static void main(String[] args) throws Exception {
      ParameterTool params = ParameterTool.fromArgs(args);
      String outputPath = params.getRequired("outputPath");

      StreamExecutionEnvironment sEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();

      sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
      sEnv.enableCheckpointing(200);
      sEnv.setParallelism(1);

      BucketingSink<Tuple4<Integer, String, String, Integer>> sink =
         new BucketingSink<Tuple4<Integer, String, String, Integer>>(outputPath)
            .setInactiveBucketThreshold(1000)
            .setInactiveBucketCheckInterval(1000)
            .setPartSuffix("PartSuffix")
            .setBatchSize(500);

      sEnv.addSource(new DataGenerator())
         .keyBy(0)
         .map(new CountUpRichMap())
         .addSink(sink);

      sEnv.execute();
   }

   public static class CountUpRichMap extends RichMapFunction<Tuple3<Integer, 
String, String>, Tuple4<Integer, String, String, Integer>> {

      private ValueState<Integer> counter;

      @Override
      public void open(Configuration parameters) throws Exception {
         counter = getRuntimeContext().getState(new 
ValueStateDescriptor<>("counter", Types.INT));
      }

      @Override
      public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, 
String, String> value) throws Exception {
         Integer counterValue = counter.value();
         if (counterValue == null) {
            counterValue = 0;
         }
         counter.update(counterValue + 1);
         return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
      }
   }

   public static class DataGenerator implements SourceFunction<Tuple3<Integer, 
String, String>> {

      public DataGenerator() {
      }

      @Override
      public void run(SourceContext<Tuple3<Integer, String, String>> ctx) 
throws Exception {
         for (int i = 0; i < 10000; i++) {
            ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some 
payloads......"));
         }
      }

      @Override
      public void cancel() {

      }
   }
}




> 在 2018年6月16日,下午10:21,Rinat <r.shari...@cleverdata.ru> 写道:
> 
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
> 
> During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
> So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
> 
> This problem is related with the following code:
> Here we are trying to find the max index of part file, that doesn’t exist in 
> bucket directory, the problem is, that the partSuffix is not involved into 
> path assembly. This means, that path always doesn’t exist
> and partCounter wouldn’t be ever incremented.
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>       fs.exists(getPendingPathFor(partPath)) ||
>       fs.exists(getInProgressPathFor(partPath))) {
>    bucketState.partCounter++;
>    partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> 
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> 
> Before creating of writer, we appending the partSuffix here, but it should be 
> already appended, before index checks
> if (partSuffix != null) {
>    partPath = partPath.suffix(partSuffix);
> }
> I’ll create an issue and try to submit a fix
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Reply via email to