Hi, Rinat I tried this situation you said and it works fine for me. The partCounter incremented as we hope. When the new part file is created, I did not see any same part index. Here is my code for that, you can take a look. In my case, the max index of part file is part-0-683PartSuffix, other than that, all still keep in _part-0-684PartSuffix.pending, _part-0-685PartSuffix.pending and so on since checkpoint does not finished.
Cheers Minglei. public class TestSuffix { public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); String outputPath = params.getRequired("outputPath"); StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints")); sEnv.enableCheckpointing(200); sEnv.setParallelism(1); BucketingSink<Tuple4<Integer, String, String, Integer>> sink = new BucketingSink<Tuple4<Integer, String, String, Integer>>(outputPath) .setInactiveBucketThreshold(1000) .setInactiveBucketCheckInterval(1000) .setPartSuffix("PartSuffix") .setBatchSize(500); sEnv.addSource(new DataGenerator()) .keyBy(0) .map(new CountUpRichMap()) .addSink(sink); sEnv.execute(); } public static class CountUpRichMap extends RichMapFunction<Tuple3<Integer, String, String>, Tuple4<Integer, String, String, Integer>> { private ValueState<Integer> counter; @Override public void open(Configuration parameters) throws Exception { counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Types.INT)); } @Override public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, String, String> value) throws Exception { Integer counterValue = counter.value(); if (counterValue == null) { counterValue = 0; } counter.update(counterValue + 1); return Tuple4.of(value.f0, value.f1, value.f2, counterValue); } } public static class DataGenerator implements SourceFunction<Tuple3<Integer, String, String>> { public DataGenerator() { } @Override public void run(SourceContext<Tuple3<Integer, String, String>> ctx) throws Exception { for (int i = 0; i < 10000; i++) { ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some payloads......")); } } @Override public void cancel() { } } } > 在 2018年6月16日,下午10:21,Rinat <r.shari...@cleverdata.ru> 写道: > > Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of > the part file. It’s very useful, when it’s necessary to set specific > extension of the file. > > During the usage, I’ve found the issue - when new part file is created, it > has the same part index, as index of just closed file. > So, when Flink tries to move it into final state, we have a > FileAlreadyExistsException. > > This problem is related with the following code: > Here we are trying to find the max index of part file, that doesn’t exist in > bucket directory, the problem is, that the partSuffix is not involved into > path assembly. This means, that path always doesn’t exist > and partCounter wouldn’t be ever incremented. > Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + > bucketState.partCounter); > while (fs.exists(partPath) || > fs.exists(getPendingPathFor(partPath)) || > fs.exists(getInProgressPathFor(partPath))) { > bucketState.partCounter++; > partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + > bucketState.partCounter); > } > > bucketState.creationTime = processingTimeService.getCurrentProcessingTime(); > > Before creating of writer, we appending the partSuffix here, but it should be > already appended, before index checks > if (partSuffix != null) { > partPath = partPath.suffix(partSuffix); > } > I’ll create an issue and try to submit a fix > > Sincerely yours, > Rinat Sharipov > Software Engineer at 1DMP CORE Team > > email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru> > mobile: +7 (925) 416-37-26 > > CleverDATA > make your data clever >