Hi,
I am trying to get the parquet writer to write to s3; however, the files do
not seem to be rolling over. The same file "part-0-0.parquet" is being
created each time. Like the 'partCounter" is not being updated? Maybe the
Bucket is being recreated each time? I don't really know... Here are some
logs:
2020-04-09 01:28:10,350 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
checkpointing for checkpoint with id=2 (max part counter=2).
2020-04-09 01:28:10,589 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
received completion notification for checkpoint with id=2.
2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.
S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU
ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.
DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I
.o.h.wN_fXoBix6lwbotzZCI
2020-04-09 01:29:10,350 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
checkpointing for checkpoint with id=3 (max part counter=3).
2020-04-09 01:29:10,520 INFO
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0
received completion notification for checkpoint with id=3.
2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.
S3Committer - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU
ID Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.
DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I
.o.h.wN_fXoBix6lwbotzZCI
And a part of my code:
```
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(2);
env.enableCheckpointing(60000L);
///PROPERTIES Added
Schema schema = bro_conn.getClassSchema();
OutputFileConfig config = OutputFileConfig
.builder()
.withPartSuffix(".parquet")
.build();
final StreamingFileSink<GenericRecord> sink = StreamingFileSink
.forBulkFormat(new Path("s3a://<bucket>/bro_conn/"),
ParquetAvroWriters.forGenericRecord(schema))
// .withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
// .withBucketAssigner(new PartitioningBucketAssigner())
.build();
DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
"kinesis", new SimpleStringSchema(), consumerConfig));
kinesis.flatMap(new JsonAvroParser())
.addSink(sink);
env.execute("Bro Conn");
```
I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom
image to add the presto/hadoop plugin.
Thanks again!