Hi Alan, In the upcoming Flink 1.11 release, there will be support for Avro using the AvroWriterFactory as seen in [1]. Do you think that this can solve your problem?
You can also download the current release-1.11 branch and also test it out to see if it fits your needs. Cheers, Kostas [1] https://issues.apache.org/jira/browse/FLINK-11395 On Tue, Jun 9, 2020 at 4:23 PM Alan Żur <alan.zur@fingerprints.digital> wrote: > > > > > > Hi, > > > > I was assigned to migrate out Flink 1.7 to 1.10 so far it’s going good, > however I’ve encountered problem with Avro writing to hdfs. Currently we’re > using Bucketing sink – which is deprecated. I’ve managed to replace few > Bucketing sinks with StreamingFileSink with row format. However I don’t have > any idea how to tackle Avro and Writer<> implementation. > > > > @Override > protected void applySink(DataStream<Feature> outputStream) { > outputStream > .keyBy(Feature::getSessionId) > .addSink(createSink()) > .uid(UID_PART.concat("sink-v1")) > .name(UID_PART.concat("hdfs_bucketing_sink")); > } > > private SinkFunction<GenericRecord> createSFSink() { > return StreamingFileSink > .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)), > ParquetAvroWriters.forGenericRecord(new > ComboFeatureAvroWriter().createSchema())) > .build(); > } > > private BucketingSink<Feature> createSink() { > return new BucketingSink<Feature>(hdfsPath) > .setBucketer(new DateTypeComboFeatureBucketer("yyyy-MM-dd", > ZoneOffset.UTC)) > .setBatchSize(batchSize) > .setBatchRolloverInterval(batchRollingInterval) > .setInactiveBucketCheckInterval(checkInactiveBucketInterval) > .setInactiveBucketThreshold(inactiveBucketThreshold) > .setUseTruncate(useTruncate) > .setWriter(new ComboFeatureAvroWriter()); > } > > Above function createSFSink() I took from > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > I’ve tried changing GenericRecord to Feature class – fail, I’ve tried to > write empty GenericRecord map just to get rid of compilation error – failed > (still giving improper type error). I’ve also tried to use > ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed