gaoyunhaii commented on a change in pull request #18797: URL: https://github.com/apache/flink/pull/18797#discussion_r808637597
########## File path: docs/content/docs/connectors/datastream/filesystem.md ########## @@ -958,6 +958,73 @@ val sink = FileSink {{< /tab >}} {{< /tabs >}} +### Compaction + +Since version 1.15 `FileSink` supports compaction of the `pending` files, +which allows the application to have smaller checkpoint interval without generating a lot of small files, +especially when using the [bulk encoded formats]({{< ref "docs/connectors/datastream/filesystem#bulk-encoded-formats" >}}) +that have to rolling on taking checkpoints. + +Compaction could be enabled with + +{{< tabs "enablecompaction" >}} +{{< tab "Java" >}} +```java + +FileSink<Integer> fileSink= + FileSink.forRowFormat(new Path(path),new SimpleStringEncoder<Integer>()) + .enableCompact( + FileCompactStrategy.Builder.newBuilder() + .setSizeThreshold(1024) + .enableCompactionOnCheckpoint(5) + .build(), + new RecordWiseFileCompactor<>( + new DecoderBasedReader.Factory<>(SimpleStringDecoder::new))) + .build(); + +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala + +val fileSink: FileSink[Integer] = + FileSink.forRowFormat(new Path(path), new SimpleStringEncoder[Integer]()) + .enableCompact( + FileCompactStrategy.Builder.newBuilder() + .setSizeThreshold(1024) + .enableCompactionOnCheckpoint(5) + .build(), + new RecordWiseFileCompactor( + new DecoderBasedReader.Factory(() => new SimpleStringDecoder))) + .build() + +``` +{{< /tab >}} +{{< /tabs >}} + +Once enabled, the compaction happens between the files become `pending` and get committed. The pending files will +be first committed to temporary files whose path starts with `.`. Then these files will be compacted according to +the strategy by the compactor specified by the users, and the new compacted pending files will be generated. +Then these pending files will be emitted to the committer to be committed to the formal files. After that, the source files will be removed. + +When enabling compaction, you need to specify the {{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.html" name="FileCompactStrategy">}} +and the {{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" name="FileCompactor">}}. + +The {{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.html" name="FileCompactStrategy">}} specifies +when and which files get compacted. Currently, there are two parallel conditions: the target file size and the number of checkpoints get passed. +Once the total size of the cached files has reached the size threshold or the number of checkpoints since the last compaction has reached the specified number, +the cached files will be scheduled to compact. + +The {{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" name="FileCompactor">}} specifies how to compact +the give list of `Path` and write the result to {{< javadoc file="org/apache/flink/connector/file/sink/filesystem/CompactingFileWriter.html" name="CompactingFileWriter">}}. It could be classified into two types according to the type of the give `CompactingFileWriter`: Review comment: The link to CompactingFileWriter seems not right. Also the same with the Chinese version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org