Hi Tao,

Is checkpointing enabled in your app? The pending files should be moved to
non-pending files after checkpoint interval.

Please take a look at this link
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html>
:
"If checkpointing is not enabled the pending files will never be moved to
the finished state"

Thanks,
Vipul



On Mon, Dec 18, 2017 at 4:30 PM, Tao Xia <t...@udacity.com> wrote:

> Hi All,
>   Do you guys write parquet file using Bucking Sink? I run into an issue
> with all the parquet files are in the pending status.  Any ideas?
>
> processedStream is a DataStream of NDEvent.
>
> Output files are all like this one "_part-0-0.pending"
>
> val parquetSink = new BucketingSink[NDEvent]("/tmp/")
> parquetSink.setBucketer(new DateTimeBucketer[NDEvent]("yyyy-MM-dd/HH"))
> parquetSink.setWriter(new SinkParquetWriter(NDEvent.getClassSchema.toString))
> processedStream.addSink(parquetSink)
>
> public class SinkParquetWriter<T> implements Writer<T> {
>
>     transient ParquetWriter writer = null;
>     String schema = null;
>
>     public SinkParquetWriter(String schema) {
>         this.writer = writer;
>         this.schema = schema;
>     }
>
>     public void open(FileSystem fileSystem, Path path) throws IOException {
>         writer = AvroParquetWriter.builder(path)
>                 .withSchema(new Schema.Parser().parse(schema))
>                 .withCompressionCodec(CompressionCodecName.SNAPPY)
>                 .build();
>     }
>
>     public long flush() throws IOException {
>         return writer.getDataSize();
>     }
>
>     public long getPos() throws IOException {
>         return writer.getDataSize();
>     }
>
>     public void close() throws IOException {
>         writer.close();
>     }
>
>     public void write(T t) throws IOException {
>         writer.write(t);
>     }
>
>     public Writer<T> duplicate() {
>         return new SinkParquetWriter<T>(schema);
>     }
> }
>
>
> Thanks,
> Tao
>



-- 
Thanks,
Vipul

Reply via email to