Hi All, Do you guys write parquet file using Bucking Sink? I run into an issue with all the parquet files are in the pending status. Any ideas?
processedStream is a DataStream of NDEvent. Output files are all like this one "_part-0-0.pending" val parquetSink = new BucketingSink[NDEvent]("/tmp/") parquetSink.setBucketer(new DateTimeBucketer[NDEvent]("yyyy-MM-dd/HH")) parquetSink.setWriter(new SinkParquetWriter(NDEvent.getClassSchema.toString)) processedStream.addSink(parquetSink) public class SinkParquetWriter<T> implements Writer<T> { transient ParquetWriter writer = null; String schema = null; public SinkParquetWriter(String schema) { this.writer = writer; this.schema = schema; } public void open(FileSystem fileSystem, Path path) throws IOException { writer = AvroParquetWriter.builder(path) .withSchema(new Schema.Parser().parse(schema)) .withCompressionCodec(CompressionCodecName.SNAPPY) .build(); } public long flush() throws IOException { return writer.getDataSize(); } public long getPos() throws IOException { return writer.getDataSize(); } public void close() throws IOException { writer.close(); } public void write(T t) throws IOException { writer.write(t); } public Writer<T> duplicate() { return new SinkParquetWriter<T>(schema); } } Thanks, Tao