Hi All,
  Do you guys write parquet file using Bucking Sink? I run into an issue
with all the parquet files are in the pending status.  Any ideas?

processedStream is a DataStream of NDEvent.

Output files are all like this one "_part-0-0.pending"

val parquetSink = new BucketingSink[NDEvent]("/tmp/")
parquetSink.setBucketer(new DateTimeBucketer[NDEvent]("yyyy-MM-dd/HH"))
parquetSink.setWriter(new SinkParquetWriter(NDEvent.getClassSchema.toString))
processedStream.addSink(parquetSink)

public class SinkParquetWriter<T> implements Writer<T> {

    transient ParquetWriter writer = null;
    String schema = null;

    public SinkParquetWriter(String schema) {
        this.writer = writer;
        this.schema = schema;
    }

    public void open(FileSystem fileSystem, Path path) throws IOException {
        writer = AvroParquetWriter.builder(path)
                .withSchema(new Schema.Parser().parse(schema))
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .build();
    }

    public long flush() throws IOException {
        return writer.getDataSize();
    }

    public long getPos() throws IOException {
        return writer.getDataSize();
    }

    public void close() throws IOException {
        writer.close();
    }

    public void write(T t) throws IOException {
        writer.write(t);
    }

    public Writer<T> duplicate() {
        return new SinkParquetWriter<T>(schema);
    }
}


Thanks,
Tao

Reply via email to