Hello, I was wondering if I could get some pointers on what I'm doing wrong here. I posted this on stack overflow <https://stackoverflow.com/questions/49655460/flink-does-not-checkpoint-and-bucketingsink-leaves-files-in-pending-state-when>, but I thought I'd also ask here.
I'm trying to generate some test data using a collection, and write that data to s3, Flink doesn't seem to do any checkpointing at all when I do this, but it does do checkpointing when the source comes from s3. For example, this DOES checkpoint and leaves output files in a completed state: ```scala val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setMaxParallelism(128) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.enableCheckpointing(2000L) env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints")) val lines: DataStream[String] = { val path = "s3a://my_bucket/simple_job/in" env .readFile( inputFormat = new TextInputFormat(new Path(path)), filePath = path, watchType = FileProcessingMode.PROCESS_CONTINUOUSLY, interval = 5000L ) } val sinkFunction: BucketingSink[String] = new BucketingSink[String]("s3a://my_bucket/simple_job/out") .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")) lines.addSink(sinkFunction) env.execute() ``` Meanwhile, this DOES NOT checkpoint, and leaves files in a .pending state even after the job has finished: ```scala val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setMaxParallelism(128) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.enableCheckpointing(2000L) env.setStateBackend(new RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints")) val lines: DataStream[String] = env.fromCollection((1 to 100).map(_.toString)) val sinkFunction: BucketingSink[String] = new BucketingSink[String]("s3a://my_bucket/simple_job/out") .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")) lines.addSink(sinkFunction) env.execute() ``` Is this a bug in flink or something I'm doing wrong? Thank you!