Hello, I was wondering if I could get some pointers on what I'm doing wrong
here. I posted this on stack overflow
<https://stackoverflow.com/questions/49655460/flink-does-not-checkpoint-and-bucketingsink-leaves-files-in-pending-state-when>,
but I thought I'd also ask here.

I'm trying to generate some test data using a collection, and write that
data to s3, Flink doesn't seem to do any checkpointing at all when I do
this, but it does do checkpointing when the source comes from s3.

For example, this DOES checkpoint and leaves output files in a completed
state:

```scala

  val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment

  env.setMaxParallelism(128)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  env.enableCheckpointing(2000L)

  env.setStateBackend(new
RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))


  val lines: DataStream[String] = {

    val path = "s3a://my_bucket/simple_job/in"

    env

      .readFile(

        inputFormat = new TextInputFormat(new Path(path)),

        filePath = path,

        watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,

        interval = 5000L

      )

  }


  val sinkFunction: BucketingSink[String] =

    new BucketingSink[String]("s3a://my_bucket/simple_job/out")

      .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))


  lines.addSink(sinkFunction)


  env.execute()
```

Meanwhile, this DOES NOT checkpoint, and leaves files in a .pending state
even after the job has finished:


```scala

  val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment

  env.setMaxParallelism(128)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  env.enableCheckpointing(2000L)

  env.setStateBackend(new
RocksDBStateBackend("s3a://my_bucket/simple_job/rocksdb_checkpoints"))


  val lines: DataStream[String] = env.fromCollection((1 to
100).map(_.toString))


  val sinkFunction: BucketingSink[String] =

    new BucketingSink[String]("s3a://my_bucket/simple_job/out")

      .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))


  lines.addSink(sinkFunction)


  env.execute()
```
Is this a bug in flink or something I'm doing wrong? Thank you!

Reply via email to