Re: Troubles with Avro migration from 1.7 to 1.10

2020-06-10 Thread Alan Żur
Hi Kostas, 

I'll try it by copying this class to my project for now, and wait for
release. I'm not expecting to finish my migration by then ;) Have a nice day
and thanks for updade - I'll keep this thread opened in case I encounter any
problems. Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Troubles with Avro migration from 1.7 to 1.10

2020-06-10 Thread Kostas Kloudas
Hi Alan,

Unfortunately not but the release is expected to come out in the next
couple of weeks, so then it will be available.
Until then, you can either copy the code of the AvroWriterFactory to
your project and use it from there, or download the project from
github, as you said.

Cheers,
Kostas

On Wed, Jun 10, 2020 at 9:24 AM Alan Żur  wrote:
>
> Hi Kostas,
>
> is this release available in maven central or should I download project from
> github?
>
> Thanks,
> Alan
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Troubles with Avro migration from 1.7 to 1.10

2020-06-10 Thread Alan Żur
Hi Kostas, 

is this release available in maven central or should I download project from
github?

Thanks, 
Alan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Troubles with Avro migration from 1.7 to 1.10

2020-06-09 Thread Kostas Kloudas
Hi Alan,

In the upcoming Flink 1.11 release, there will be support for Avro
using the AvroWriterFactory as seen in [1].
Do you think that this can solve your problem?

You can also download the current release-1.11 branch and also test it
out to see if it fits your needs.

Cheers,
Kostas

[1] https://issues.apache.org/jira/browse/FLINK-11395

On Tue, Jun 9, 2020 at 4:23 PM Alan Żur  wrote:
>
>
>
>
>
> Hi,
>
>
>
> I was assigned to migrate out Flink 1.7 to 1.10 so far it’s going good, 
> however I’ve encountered problem with Avro writing to hdfs. Currently we’re 
> using Bucketing sink – which is deprecated. I’ve managed to replace few 
> Bucketing sinks with StreamingFileSink with row format. However I don’t have 
> any idea how to tackle Avro and Writer<> implementation.
>
>
>
> @Override
> protected void applySink(DataStream outputStream) {
> outputStream
> .keyBy(Feature::getSessionId)
> .addSink(createSink())
> .uid(UID_PART.concat("sink-v1"))
> .name(UID_PART.concat("hdfs_bucketing_sink"));
> }
>
> private SinkFunction createSFSink() {
> return StreamingFileSink
> .forBulkFormat(Path.fromLocalFile(new File(hdfsPath)),
> ParquetAvroWriters.forGenericRecord(new 
> ComboFeatureAvroWriter().createSchema()))
> .build();
> }
>
> private BucketingSink createSink() {
> return new BucketingSink(hdfsPath)
> .setBucketer(new DateTypeComboFeatureBucketer("-MM-dd", 
> ZoneOffset.UTC))
> .setBatchSize(batchSize)
> .setBatchRolloverInterval(batchRollingInterval)
> .setInactiveBucketCheckInterval(checkInactiveBucketInterval)
> .setInactiveBucketThreshold(inactiveBucketThreshold)
> .setUseTruncate(useTruncate)
> .setWriter(new ComboFeatureAvroWriter());
> }
>
> Above function createSFSink() I took from 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>  I’ve tried changing GenericRecord to Feature class – fail, I’ve tried to 
> write empty GenericRecord map just to get rid of compilation error – failed 
> (still giving improper type error). I’ve also tried to use 
> ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed


Troubles with Avro migration from 1.7 to 1.10

2020-06-09 Thread Alan Żur


Hi,

I was assigned to migrate out Flink 1.7 to 1.10 so far it's going good, however 
I've encountered problem with Avro writing to hdfs. Currently we're using 
Bucketing sink - which is deprecated. I've managed to replace few Bucketing 
sinks with StreamingFileSink with row format. However I don't have any idea how 
to tackle Avro and Writer<> implementation.

@Override
protected void applySink(DataStream outputStream) {
outputStream
.keyBy(Feature::getSessionId)
.addSink(createSink())
.uid(UID_PART.concat("sink-v1"))
.name(UID_PART.concat("hdfs_bucketing_sink"));
}

private SinkFunction createSFSink() {
return StreamingFileSink
.forBulkFormat(Path.fromLocalFile(new File(hdfsPath)),
ParquetAvroWriters.forGenericRecord(new 
ComboFeatureAvroWriter().createSchema()))
.build();
}

private BucketingSink createSink() {
return new BucketingSink(hdfsPath)
.setBucketer(new DateTypeComboFeatureBucketer("-MM-dd", 
ZoneOffset.UTC))
.setBatchSize(batchSize)
.setBatchRolloverInterval(batchRollingInterval)
.setInactiveBucketCheckInterval(checkInactiveBucketInterval)
.setInactiveBucketThreshold(inactiveBucketThreshold)
.setUseTruncate(useTruncate)
.setWriter(new ComboFeatureAvroWriter());
}
Above function createSFSink() I took from 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
 I've tried changing GenericRecord to Feature class - fail, I've tried to write 
empty GenericRecord map just to get rid of compilation error - failed (still 
giving improper type error). I've also tried to use 
ParquetAvroWriters.forSpecificRecord(Feature.class) and also failed