[jira] [Updated] (FLINK-26050) Too many small sst files in rocksdb state backend when using time window created in ascending order

2024-06-03 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-26050:
---
Labels: pull-request-available  (was: )

> Too many small sst files in rocksdb state backend when using time window 
> created in ascending order
> ---
>
> Key: FLINK-26050
> URL: https://issues.apache.org/jira/browse/FLINK-26050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.2, 1.14.3
>Reporter: shen
>Assignee: Roman Khachatryan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.20.0
>
> Attachments: image-2022-02-09-21-22-13-920.png, 
> image-2022-02-11-10-32-14-956.png, image-2022-02-11-10-36-46-630.png, 
> image-2022-02-14-13-04-52-325.png
>
>
> When using processing or event time windows, in some workloads, there will be 
> a lot of small sst files(serveral KB) in rocksdb local directory and may 
> cause "Too many files error".
> Use rocksdb tool ldb to find out content in sst files:
>  * column family of these small sst files is "processing_window-timers".
>  * most sst files are in level-1.
>  * records in sst files are almost kTypeDeletion.
>  * creation time of sst file correspond to checkpoint interval.
> These small sst files seem to be generated when flink checkpoint is 
> triggered. Although all content in sst are delete tags, they are not 
> compacted and deleted in rocksdb compaction because of not intersecting with 
> each other(rocksdb [compaction trivial 
> move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And 
> there seems to be no chance to delete them because of small size and not 
> intersect with other sst files.
>  
> I will attach a simple program to reproduce the problem.
>  
> Since timer in processing time window is generated in strictly ascending 
> order(both put and delete). So If workload of job happen to generate level-0 
> sst files not intersect with each other.(for example: processing window size 
> much smaller than checkpoint interval, and no window content cross checkpoint 
> interval or no new data in window crossing checkpoint interval). There will 
> be many small sst files generated until job restored from savepoint, or 
> incremental checkpoint is disabled. 
>  
> May be similar problem exists when user use timer in operators with same 
> workload.
>  
> Code to reproduce the problem:
> {code:java}
> package org.apache.flink.jira;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.RestOptions;
> import org.apache.flink.configuration.TaskManagerOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import 
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
> import java.util.Collections;
> import java.util.List;
> import java.util.Random;
> @Slf4j
> public class StreamApp  {
>   public static void main(String[] args) throws Exception {
> Configuration config = new Configuration();
> config.set(RestOptions.ADDRESS, "127.0.0.1");
> config.set(RestOptions.PORT, 10086);
> config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
> new 
> StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, 
> config));
>   }
>   public void configureApp(StreamExecutionEnvironment env) throws Exception {
> env.enableCheckpointing(2); // 20sec
> RocksDBStateBackend rocksDBStateBackend =
> new 
> RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
>  true); // need to be reconfigured
> 
> rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
>  // need to be reconfigured
> env.setStateBackend(rocksDBStateBackend);
> env.getCheckpointConfig().setCheckpointTimeout(10);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> 

[jira] [Updated] (FLINK-26050) Too many small sst files in rocksdb state backend when using time window created in ascending order

2024-05-29 Thread Roman Khachatryan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Khachatryan updated FLINK-26050:
--
Fix Version/s: 1.20.0

> Too many small sst files in rocksdb state backend when using time window 
> created in ascending order
> ---
>
> Key: FLINK-26050
> URL: https://issues.apache.org/jira/browse/FLINK-26050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.2, 1.14.3
>Reporter: shen
>Assignee: Roman Khachatryan
>Priority: Major
> Fix For: 1.20.0
>
> Attachments: image-2022-02-09-21-22-13-920.png, 
> image-2022-02-11-10-32-14-956.png, image-2022-02-11-10-36-46-630.png, 
> image-2022-02-14-13-04-52-325.png
>
>
> When using processing or event time windows, in some workloads, there will be 
> a lot of small sst files(serveral KB) in rocksdb local directory and may 
> cause "Too many files error".
> Use rocksdb tool ldb to find out content in sst files:
>  * column family of these small sst files is "processing_window-timers".
>  * most sst files are in level-1.
>  * records in sst files are almost kTypeDeletion.
>  * creation time of sst file correspond to checkpoint interval.
> These small sst files seem to be generated when flink checkpoint is 
> triggered. Although all content in sst are delete tags, they are not 
> compacted and deleted in rocksdb compaction because of not intersecting with 
> each other(rocksdb [compaction trivial 
> move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And 
> there seems to be no chance to delete them because of small size and not 
> intersect with other sst files.
>  
> I will attach a simple program to reproduce the problem.
>  
> Since timer in processing time window is generated in strictly ascending 
> order(both put and delete). So If workload of job happen to generate level-0 
> sst files not intersect with each other.(for example: processing window size 
> much smaller than checkpoint interval, and no window content cross checkpoint 
> interval or no new data in window crossing checkpoint interval). There will 
> be many small sst files generated until job restored from savepoint, or 
> incremental checkpoint is disabled. 
>  
> May be similar problem exists when user use timer in operators with same 
> workload.
>  
> Code to reproduce the problem:
> {code:java}
> package org.apache.flink.jira;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.RestOptions;
> import org.apache.flink.configuration.TaskManagerOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import 
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
> import java.util.Collections;
> import java.util.List;
> import java.util.Random;
> @Slf4j
> public class StreamApp  {
>   public static void main(String[] args) throws Exception {
> Configuration config = new Configuration();
> config.set(RestOptions.ADDRESS, "127.0.0.1");
> config.set(RestOptions.PORT, 10086);
> config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
> new 
> StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, 
> config));
>   }
>   public void configureApp(StreamExecutionEnvironment env) throws Exception {
> env.enableCheckpointing(2); // 20sec
> RocksDBStateBackend rocksDBStateBackend =
> new 
> RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
>  true); // need to be reconfigured
> 
> rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
>  // need to be reconfigured
> env.setStateBackend(rocksDBStateBackend);
> env.getCheckpointConfig().setCheckpointTimeout(10);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.getConfig().setTaskCancellationInterval(1);
> for (int i = 0; i < 1; ++i) {
>   

[jira] [Updated] (FLINK-26050) Too many small sst files in rocksdb state backend when using time window created in ascending order

2024-02-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26050:
---
Description: 
When using processing or event time windows, in some workloads, there will be a 
lot of small sst files(serveral KB) in rocksdb local directory and may cause 
"Too many files error".

Use rocksdb tool ldb to find out content in sst files:
 * column family of these small sst files is "processing_window-timers".
 * most sst files are in level-1.
 * records in sst files are almost kTypeDeletion.
 * creation time of sst file correspond to checkpoint interval.

These small sst files seem to be generated when flink checkpoint is triggered. 
Although all content in sst are delete tags, they are not compacted and deleted 
in rocksdb compaction because of not intersecting with each other(rocksdb 
[compaction trivial 
move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And 
there seems to be no chance to delete them because of small size and not 
intersect with other sst files.

 

I will attach a simple program to reproduce the problem.

 

Since timer in processing time window is generated in strictly ascending 
order(both put and delete). So If workload of job happen to generate level-0 
sst files not intersect with each other.(for example: processing window size 
much smaller than checkpoint interval, and no window content cross checkpoint 
interval or no new data in window crossing checkpoint interval). There will be 
many small sst files generated until job restored from savepoint, or 
incremental checkpoint is disabled. 

 

May be similar problem exists when user use timer in operators with same 
workload.

 

Code to reproduce the problem:
{code:java}
package org.apache.flink.jira;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Collections;
import java.util.List;
import java.util.Random;

@Slf4j
public class StreamApp  {
  public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set(RestOptions.ADDRESS, "127.0.0.1");
config.set(RestOptions.PORT, 10086);
config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
new 
StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, 
config));
  }

  public void configureApp(StreamExecutionEnvironment env) throws Exception {

env.enableCheckpointing(2); // 20sec

RocksDBStateBackend rocksDBStateBackend =
new 
RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
 true); // need to be reconfigured

rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
 // need to be reconfigured

env.setStateBackend(rocksDBStateBackend);
env.getCheckpointConfig().setCheckpointTimeout(10);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.getConfig().setTaskCancellationInterval(1);

for (int i = 0; i < 1; ++i) {
  createOnePipeline(env);
}

env.execute("StreamApp");
  }


  private void createOnePipeline(StreamExecutionEnvironment env) {
// data source is configured so that little window cross checkpoint interval
DataStreamSource stream = env.addSource(new Generator(1, 3000, 
3600));

stream.keyBy(x -> x)
// make sure window size less than checkpoint interval. though 100ms is 
too small, I think increase this value can still reproduce the problem with 
longer time.
.window(TumblingProcessingTimeWindows.of(Time.milliseconds(100)))
.process(new ProcessWindowFunction() {
  @Override
  public void process(String s, ProcessWindowFunction.Context context,
  Iterable elements, Collector out) {
for (String ele: elements) {
  out.collect(ele);
}
  }
}).print();
  }

  public static final class Generator 

[jira] [Updated] (FLINK-26050) Too many small sst files in rocksdb state backend when using time window created in ascending order

2024-02-12 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-26050:
---
Summary: Too many small sst files in rocksdb state backend when using time 
window created in ascending order  (was: Too many small sst files in rocksdb 
state backend when using processing time window)

> Too many small sst files in rocksdb state backend when using time window 
> created in ascending order
> ---
>
> Key: FLINK-26050
> URL: https://issues.apache.org/jira/browse/FLINK-26050
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.10.2, 1.14.3
>Reporter: shen
>Priority: Major
> Attachments: image-2022-02-09-21-22-13-920.png, 
> image-2022-02-11-10-32-14-956.png, image-2022-02-11-10-36-46-630.png, 
> image-2022-02-14-13-04-52-325.png
>
>
> When using processing time window, in some workload, there will be a lot of 
> small sst files(serveral KB) in rocksdb local directory and may cause "Too 
> many files error".
> Use rocksdb tool ldb to find out content in sst files:
>  * column family of these small sst files is "processing_window-timers".
>  * most sst files are in level-1.
>  * records in sst files are almost kTypeDeletion.
>  * creation time of sst file correspond to checkpoint interval.
> These small sst files seem to be generated when flink checkpoint is 
> triggered. Although all content in sst are delete tags, they are not 
> compacted and deleted in rocksdb compaction because of not intersecting with 
> each other(rocksdb [compaction trivial 
> move|https://github.com/facebook/rocksdb/wiki/Compaction-Trivial-Move]). And 
> there seems to be no chance to delete them because of small size and not 
> intersect with other sst files.
>  
> I will attach a simple program to reproduce the problem.
>  
> Since timer in processing time window is generated in strictly ascending 
> order(both put and delete). So If workload of job happen to generate level-0 
> sst files not intersect with each other.(for example: processing window size 
> much smaller than checkpoint interval, and no window content cross checkpoint 
> interval or no new data in window crossing checkpoint interval). There will 
> be many small sst files generated until job restored from savepoint, or 
> incremental checkpoint is disabled. 
>  
> May be similar problem exists when user use timer in operators with same 
> workload.
>  
> Code to reproduce the problem:
> {code:java}
> package org.apache.flink.jira;
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.RestOptions;
> import org.apache.flink.configuration.TaskManagerOptions;
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import 
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import 
> org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.util.Collector;
> import java.util.Collections;
> import java.util.List;
> import java.util.Random;
> @Slf4j
> public class StreamApp  {
>   public static void main(String[] args) throws Exception {
> Configuration config = new Configuration();
> config.set(RestOptions.ADDRESS, "127.0.0.1");
> config.set(RestOptions.PORT, 10086);
> config.set(TaskManagerOptions.NUM_TASK_SLOTS, 6);
> new 
> StreamApp().configureApp(StreamExecutionEnvironment.createLocalEnvironment(1, 
> config));
>   }
>   public void configureApp(StreamExecutionEnvironment env) throws Exception {
> env.enableCheckpointing(2); // 20sec
> RocksDBStateBackend rocksDBStateBackend =
> new 
> RocksDBStateBackend("file:///Users/shenjiaqi/Workspace/jira/flink-51/checkpoints/",
>  true); // need to be reconfigured
> 
> rocksDBStateBackend.setDbStoragePath("/Users/shenjiaqi/Workspace/jira/flink-51/flink/rocksdb_local_db");
>  // need to be reconfigured
> env.setStateBackend(rocksDBStateBackend);
> env.getCheckpointConfig().setCheckpointTimeout(10);
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
> env.setParallelism(1);
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>