Hi Alan,

In the upcoming Flink 1.11 release, there will be support for Avro
using the AvroWriterFactory as seen in [1].
Do you think that this can solve your problem?

You can also download the current release-1.11 branch and also test it
out to see if it fits your needs.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-11395

On Tue, Jun 9, 2020 at 4:23 PM Alan Żur <alan.zur@fingerprints.digital> wrote:
>
>
>
>
>
> Hi,
>
>
>
> I was assigned to migrate out Flink 1.7 to 1.10 so far it’s going good, 
> however I’ve encountered problem with Avro writing to hdfs. Currently we’re 
> using Bucketing sink – which is deprecated. I’ve managed to replace few 
> Bucketing sinks with StreamingFileSink with row format. However I don’t have 
> any idea how to tackle Avro and Writer<> implementation.
>
>
>
> @Override
> protected void applySink(DataStream<Feature> outputStream) {
>     outputStream
>             .keyBy(Feature::getSessionId)
>             .addSink(createSink())
>             .uid(UID_PART.concat("sink-v1"))
>             .name(UID_PART.concat("hdfs_bucketing_sink"));
> }
>
> private SinkFunction<GenericRecord> createSFSink() {
>     return StreamingFileSink
>             .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)),
>                     ParquetAvroWriters.forGenericRecord(new 
> ComboFeatureAvroWriter().createSchema()))
>             .build();
> }
>
> private BucketingSink<Feature> createSink() {
>     return new BucketingSink<Feature>(hdfsPath)
>             .setBucketer(new DateTypeComboFeatureBucketer("yyyy-MM-dd", 
> ZoneOffset.UTC))
>             .setBatchSize(batchSize)
>             .setBatchRolloverInterval(batchRollingInterval)
>             .setInactiveBucketCheckInterval(checkInactiveBucketInterval)
>             .setInactiveBucketThreshold(inactiveBucketThreshold)
>             .setUseTruncate(useTruncate)
>             .setWriter(new ComboFeatureAvroWriter());
> }
>
> Above function createSFSink() I took from 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>  I’ve tried changing GenericRecord to Feature class – fail, I’ve tried to 
> write empty GenericRecord map just to get rid of compilation error – failed 
> (still giving improper type error). I’ve also tried to use 
> ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed

Reply via email to