Hi Tao, Is checkpointing enabled in your app? The pending files should be moved to non-pending files after checkpoint interval.
Please take a look at this link <https://ci.apache.org/projects/flink/flink-docs-release-1.2/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html> : "If checkpointing is not enabled the pending files will never be moved to the finished state" Thanks, Vipul On Mon, Dec 18, 2017 at 4:30 PM, Tao Xia <t...@udacity.com> wrote: > Hi All, > Do you guys write parquet file using Bucking Sink? I run into an issue > with all the parquet files are in the pending status. Any ideas? > > processedStream is a DataStream of NDEvent. > > Output files are all like this one "_part-0-0.pending" > > val parquetSink = new BucketingSink[NDEvent]("/tmp/") > parquetSink.setBucketer(new DateTimeBucketer[NDEvent]("yyyy-MM-dd/HH")) > parquetSink.setWriter(new SinkParquetWriter(NDEvent.getClassSchema.toString)) > processedStream.addSink(parquetSink) > > public class SinkParquetWriter<T> implements Writer<T> { > > transient ParquetWriter writer = null; > String schema = null; > > public SinkParquetWriter(String schema) { > this.writer = writer; > this.schema = schema; > } > > public void open(FileSystem fileSystem, Path path) throws IOException { > writer = AvroParquetWriter.builder(path) > .withSchema(new Schema.Parser().parse(schema)) > .withCompressionCodec(CompressionCodecName.SNAPPY) > .build(); > } > > public long flush() throws IOException { > return writer.getDataSize(); > } > > public long getPos() throws IOException { > return writer.getDataSize(); > } > > public void close() throws IOException { > writer.close(); > } > > public void write(T t) throws IOException { > writer.write(t); > } > > public Writer<T> duplicate() { > return new SinkParquetWriter<T>(schema); > } > } > > > Thanks, > Tao > -- Thanks, Vipul