gaoyunhaii commented on a change in pull request #18797:
URL: https://github.com/apache/flink/pull/18797#discussion_r808637597



##########
File path: docs/content/docs/connectors/datastream/filesystem.md
##########
@@ -958,6 +958,73 @@ val sink = FileSink
 {{< /tab >}}
 {{< /tabs >}}
 
+### Compaction
+
+Since version 1.15 `FileSink` supports compaction of the `pending` files,
+which allows the application to have smaller checkpoint interval without 
generating a lot of small files,
+especially when using the [bulk encoded formats]({{< ref 
"docs/connectors/datastream/filesystem#bulk-encoded-formats" >}})
+that have to rolling on taking checkpoints.
+
+Compaction could be enabled with
+
+{{< tabs "enablecompaction" >}}
+{{< tab "Java" >}}
+```java
+
+FileSink<Integer> fileSink=
+       FileSink.forRowFormat(new Path(path),new SimpleStringEncoder<Integer>())
+           .enableCompact(
+               FileCompactStrategy.Builder.newBuilder()
+                   .setSizeThreshold(1024)
+                   .enableCompactionOnCheckpoint(5)
+                   .build(),
+               new RecordWiseFileCompactor<>(
+                   new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
+           .build();
+
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+
+val fileSink: FileSink[Integer] =
+  FileSink.forRowFormat(new Path(path), new SimpleStringEncoder[Integer]())
+    .enableCompact(
+      FileCompactStrategy.Builder.newBuilder()
+        .setSizeThreshold(1024)
+        .enableCompactionOnCheckpoint(5)
+        .build(),
+      new RecordWiseFileCompactor(
+        new DecoderBasedReader.Factory(() => new SimpleStringDecoder)))
+    .build()
+
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+Once enabled, the compaction happens between the files become `pending` and 
get committed. The pending files will
+be first committed to temporary files whose path starts with `.`. Then these 
files will be compacted according to
+the strategy by the compactor specified by the users, and the new compacted 
pending files will be generated.
+Then these pending files will be emitted to the committer to be committed to 
the formal files. After that, the source files will be removed.
+
+When enabling compaction, you need to specify the {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.html" 
name="FileCompactStrategy">}}
+and the {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" 
name="FileCompactor">}}.
+
+The {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.html" 
name="FileCompactStrategy">}} specifies
+when and which files get compacted. Currently, there are two parallel 
conditions: the target file size and the number of checkpoints get passed.
+Once the total size of the cached files has reached the size threshold or the 
number of checkpoints since the last compaction has reached the specified 
number, 
+the cached files will be scheduled to compact.
+
+The {{< javadoc 
file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" 
name="FileCompactor">}} specifies how to compact
+the give list of `Path` and write the result to {{< javadoc 
file="org/apache/flink/connector/file/sink/filesystem/CompactingFileWriter.html"
 name="CompactingFileWriter">}}. It could be classified into two types 
according to the type of the give `CompactingFileWriter`:

Review comment:
       The link to CompactingFileWriter seems not right. Also the same with the 
Chinese version




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to