Got it , my bad. I should have used backeteer. this seems to be working fine
StreamingFileSink.forBulkFormat[Request](
new Path(outputPath),
ParquetAvroWriters.forReflectRecord(classOf[Request]))
.withBucketAssigner(DateTimeBucketAssigner[Request])
.withBucketCheckInterval(5000L)
.build()
On Sun, Dec 9, 2018 at 2:13 PM Avi Levi <[email protected]> wrote:
> Hi,
> I am trying to read from kafka and write to parquet. But I am getting
> thousands of ".part-0-0in progress..." files (and counting ...)
> is that a bug or am I doing something wrong?
>
> object StreamParquet extends App {
> implicit val env: StreamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment
> env.enableCheckpointing(100)
> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
> env.getCheckpointConfig.setCheckpointTimeout(600)
> env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> env.setParallelism(1)
> val consumer = new FlinkKafkaConsumer011[Address](SOURCE_TOPIC, new
> AddressSchema(), consumerProperties)
> val stream: DataStreamSource[Address] = env.addSource(QueueImpl.consumer)
> val outputPath = "streaming_files"
> val sink = StreamingFileSink.forBulkFormat(
> new Path(outputPath),
> ParquetAvroWriters.forReflectRecord(classOf[Address])).build()
> stream.addSink(sink)
> env.execute("Write to file")
> }
>
>