Re: Troubles with Avro migration from 1.7 to 1.10
Hi Kostas, I'll try it by copying this class to my project for now, and wait for release. I'm not expecting to finish my migration by then ;) Have a nice day and thanks for updade - I'll keep this thread opened in case I encounter any problems. Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Troubles with Avro migration from 1.7 to 1.10
Hi Alan, Unfortunately not but the release is expected to come out in the next couple of weeks, so then it will be available. Until then, you can either copy the code of the AvroWriterFactory to your project and use it from there, or download the project from github, as you said. Cheers, Kostas On Wed, Jun 10, 2020 at 9:24 AM Alan Żur wrote: > > Hi Kostas, > > is this release available in maven central or should I download project from > github? > > Thanks, > Alan > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Troubles with Avro migration from 1.7 to 1.10
Hi Kostas, is this release available in maven central or should I download project from github? Thanks, Alan -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Troubles with Avro migration from 1.7 to 1.10
Hi Alan, In the upcoming Flink 1.11 release, there will be support for Avro using the AvroWriterFactory as seen in [1]. Do you think that this can solve your problem? You can also download the current release-1.11 branch and also test it out to see if it fits your needs. Cheers, Kostas [1] https://issues.apache.org/jira/browse/FLINK-11395 On Tue, Jun 9, 2020 at 4:23 PM Alan Żur wrote: > > > > > > Hi, > > > > I was assigned to migrate out Flink 1.7 to 1.10 so far it’s going good, > however I’ve encountered problem with Avro writing to hdfs. Currently we’re > using Bucketing sink – which is deprecated. I’ve managed to replace few > Bucketing sinks with StreamingFileSink with row format. However I don’t have > any idea how to tackle Avro and Writer<> implementation. > > > > @Override > protected void applySink(DataStream outputStream) { > outputStream > .keyBy(Feature::getSessionId) > .addSink(createSink()) > .uid(UID_PART.concat("sink-v1")) > .name(UID_PART.concat("hdfs_bucketing_sink")); > } > > private SinkFunction createSFSink() { > return StreamingFileSink > .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)), > ParquetAvroWriters.forGenericRecord(new > ComboFeatureAvroWriter().createSchema())) > .build(); > } > > private BucketingSink createSink() { > return new BucketingSink(hdfsPath) > .setBucketer(new DateTypeComboFeatureBucketer("-MM-dd", > ZoneOffset.UTC)) > .setBatchSize(batchSize) > .setBatchRolloverInterval(batchRollingInterval) > .setInactiveBucketCheckInterval(checkInactiveBucketInterval) > .setInactiveBucketThreshold(inactiveBucketThreshold) > .setUseTruncate(useTruncate) > .setWriter(new ComboFeatureAvroWriter()); > } > > Above function createSFSink() I took from > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > I’ve tried changing GenericRecord to Feature class – fail, I’ve tried to > write empty GenericRecord map just to get rid of compilation error – failed > (still giving improper type error). I’ve also tried to use > ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed
Troubles with Avro migration from 1.7 to 1.10
Hi, I was assigned to migrate out Flink 1.7 to 1.10 so far it's going good, however I've encountered problem with Avro writing to hdfs. Currently we're using Bucketing sink - which is deprecated. I've managed to replace few Bucketing sinks with StreamingFileSink with row format. However I don't have any idea how to tackle Avro and Writer<> implementation. @Override protected void applySink(DataStream outputStream) { outputStream .keyBy(Feature::getSessionId) .addSink(createSink()) .uid(UID_PART.concat("sink-v1")) .name(UID_PART.concat("hdfs_bucketing_sink")); } private SinkFunction createSFSink() { return StreamingFileSink .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)), ParquetAvroWriters.forGenericRecord(new ComboFeatureAvroWriter().createSchema())) .build(); } private BucketingSink createSink() { return new BucketingSink(hdfsPath) .setBucketer(new DateTypeComboFeatureBucketer("-MM-dd", ZoneOffset.UTC)) .setBatchSize(batchSize) .setBatchRolloverInterval(batchRollingInterval) .setInactiveBucketCheckInterval(checkInactiveBucketInterval) .setInactiveBucketThreshold(inactiveBucketThreshold) .setUseTruncate(useTruncate) .setWriter(new ComboFeatureAvroWriter()); } Above function createSFSink() I took from https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html I've tried changing GenericRecord to Feature class - fail, I've tried to write empty GenericRecord map just to get rid of compilation error - failed (still giving improper type error). I've also tried to use ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed