Re: AvroWriter for Rolling sink
[FLINK-3854] Support Avro key-value rolling sink writer #1953 On 27 April 2016 at 19:29, Igor Berman wrote: > Hi Aljoscha, > > avro-mapred jar contains different M/R output formats for avro, and their > writers > it's primary used in M/R jobs that produce avro output > see some details here : https://avro.apache.org/docs/1.7.6/mr.html > > I have extracted(kind of copy-pasted+adjustments) some of the classes from > there to remove this dependency and it's seems to work with basic scenario > I still want to write it as if it was created with M/R job to be > compatible with this library(e.g. key-value pairs are wrapped into > AvroKeyValue object) so that it's not important if Flink or regular M/R > created this files, still their consumer can read them in a same way > WDYT? > > > > > > On 27 April 2016 at 11:27, Aljoscha Krettek wrote: > >> Hi, >> which code did you reuse from there? I asked Robert and I think it is >> somewhat problematic to add these somewhat bigger dependencies. >> >> Cheers, >> Aljoscha >> >> On Mon, 25 Apr 2016 at 21:24 Igor Berman wrote: >> >>> Hi, >>> it's not a problem, I'll find time to change it(I understand the >>> refactoring is in master and not released yet). >>> Wanted to ask if it's acceptable to add following dependency to flink? >>> I mean my code reused code in this jar(pay attention it's not present >>> currently in flink classpath) >>> >>> org.apache.avro >>> avro-mapred >>> 1.7.6 >>> hadoop2 >>> >>> >>> On 25 April 2016 at 16:20, Aljoscha Krettek wrote: >>> Hi, the code looks very good! Do you think it can be adapted to the slightly modified interface introduced here: https://issues.apache.org/jira/browse/FLINK-3637 It basically requires the writer to know the write position, so that we can truncate to a valid position in case of failure. Cheers, Aljoscha On Thu, 21 Apr 2016 at 18:40 Igor Berman wrote: > ok, > I have working prototype already, if somebody is interested(attached) > > I might add it as PR latter(with tests etc) > > tested locally & with s3 > > > > > > > > On 21 April 2016 at 12:01, Aljoscha Krettek > wrote: > >> Hi, >> as far as I know there is no one working on this. I'm only aware of >> someone working on an ORC (from Hive) Writer. >> >> This would be a welcome addition! I think you are already on the >> right track, the only thing required will probably be an AvroFileWriter >> and >> you already started looking at SequenceFileWriter, which should be >> similar. >> >> Cheers, >> Aljoscha >> >> On Thu, 21 Apr 2016 at 09:45 Igor Berman >> wrote: >> >>> Hi All, >>> Is there such implementation somewhere?(before I start to implement >>> it myself, it seems not too difficult based on SequenceFileWriter >>> example) >>> >>> anyway any ideas/pointers will be highly appreciated >>> >>> thanks in advance >>> >>> > >>> >
Re: AvroWriter for Rolling sink
Hi Aljoscha, avro-mapred jar contains different M/R output formats for avro, and their writers it's primary used in M/R jobs that produce avro output see some details here : https://avro.apache.org/docs/1.7.6/mr.html I have extracted(kind of copy-pasted+adjustments) some of the classes from there to remove this dependency and it's seems to work with basic scenario I still want to write it as if it was created with M/R job to be compatible with this library(e.g. key-value pairs are wrapped into AvroKeyValue object) so that it's not important if Flink or regular M/R created this files, still their consumer can read them in a same way WDYT? On 27 April 2016 at 11:27, Aljoscha Krettek wrote: > Hi, > which code did you reuse from there? I asked Robert and I think it is > somewhat problematic to add these somewhat bigger dependencies. > > Cheers, > Aljoscha > > On Mon, 25 Apr 2016 at 21:24 Igor Berman wrote: > >> Hi, >> it's not a problem, I'll find time to change it(I understand the >> refactoring is in master and not released yet). >> Wanted to ask if it's acceptable to add following dependency to flink? >> I mean my code reused code in this jar(pay attention it's not present >> currently in flink classpath) >> >> org.apache.avro >> avro-mapred >> 1.7.6 >> hadoop2 >> >> >> On 25 April 2016 at 16:20, Aljoscha Krettek wrote: >> >>> Hi, >>> the code looks very good! Do you think it can be adapted to the slightly >>> modified interface introduced here: >>> https://issues.apache.org/jira/browse/FLINK-3637 >>> >>> It basically requires the writer to know the write position, so that we >>> can truncate to a valid position in case of failure. >>> >>> Cheers, >>> Aljoscha >>> >>> On Thu, 21 Apr 2016 at 18:40 Igor Berman wrote: >>> ok, I have working prototype already, if somebody is interested(attached) I might add it as PR latter(with tests etc) tested locally & with s3 On 21 April 2016 at 12:01, Aljoscha Krettek wrote: > Hi, > as far as I know there is no one working on this. I'm only aware of > someone working on an ORC (from Hive) Writer. > > This would be a welcome addition! I think you are already on the right > track, the only thing required will probably be an AvroFileWriter and you > already started looking at SequenceFileWriter, which should be similar. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 09:45 Igor Berman > wrote: > >> Hi All, >> Is there such implementation somewhere?(before I start to implement >> it myself, it seems not too difficult based on SequenceFileWriter >> example) >> >> anyway any ideas/pointers will be highly appreciated >> >> thanks in advance >> >> >>
Re: AvroWriter for Rolling sink
Hi, which code did you reuse from there? I asked Robert and I think it is somewhat problematic to add these somewhat bigger dependencies. Cheers, Aljoscha On Mon, 25 Apr 2016 at 21:24 Igor Berman wrote: > Hi, > it's not a problem, I'll find time to change it(I understand the > refactoring is in master and not released yet). > Wanted to ask if it's acceptable to add following dependency to flink? > I mean my code reused code in this jar(pay attention it's not present > currently in flink classpath) > > org.apache.avro > avro-mapred > 1.7.6 > hadoop2 > > > On 25 April 2016 at 16:20, Aljoscha Krettek wrote: > >> Hi, >> the code looks very good! Do you think it can be adapted to the slightly >> modified interface introduced here: >> https://issues.apache.org/jira/browse/FLINK-3637 >> >> It basically requires the writer to know the write position, so that we >> can truncate to a valid position in case of failure. >> >> Cheers, >> Aljoscha >> >> On Thu, 21 Apr 2016 at 18:40 Igor Berman wrote: >> >>> ok, >>> I have working prototype already, if somebody is interested(attached) >>> >>> I might add it as PR latter(with tests etc) >>> >>> tested locally & with s3 >>> >>> >>> >>> >>> >>> >>> >>> On 21 April 2016 at 12:01, Aljoscha Krettek wrote: >>> Hi, as far as I know there is no one working on this. I'm only aware of someone working on an ORC (from Hive) Writer. This would be a welcome addition! I think you are already on the right track, the only thing required will probably be an AvroFileWriter and you already started looking at SequenceFileWriter, which should be similar. Cheers, Aljoscha On Thu, 21 Apr 2016 at 09:45 Igor Berman wrote: > Hi All, > Is there such implementation somewhere?(before I start to implement it > myself, it seems not too difficult based on SequenceFileWriter example) > > anyway any ideas/pointers will be highly appreciated > > thanks in advance > > >>> >
Re: AvroWriter for Rolling sink
Hi, it's not a problem, I'll find time to change it(I understand the refactoring is in master and not released yet). Wanted to ask if it's acceptable to add following dependency to flink? I mean my code reused code in this jar(pay attention it's not present currently in flink classpath) org.apache.avro avro-mapred 1.7.6 hadoop2 On 25 April 2016 at 16:20, Aljoscha Krettek wrote: > Hi, > the code looks very good! Do you think it can be adapted to the slightly > modified interface introduced here: > https://issues.apache.org/jira/browse/FLINK-3637 > > It basically requires the writer to know the write position, so that we > can truncate to a valid position in case of failure. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 18:40 Igor Berman wrote: > >> ok, >> I have working prototype already, if somebody is interested(attached) >> >> I might add it as PR latter(with tests etc) >> >> tested locally & with s3 >> >> >> >> >> >> >> >> On 21 April 2016 at 12:01, Aljoscha Krettek wrote: >> >>> Hi, >>> as far as I know there is no one working on this. I'm only aware of >>> someone working on an ORC (from Hive) Writer. >>> >>> This would be a welcome addition! I think you are already on the right >>> track, the only thing required will probably be an AvroFileWriter and you >>> already started looking at SequenceFileWriter, which should be similar. >>> >>> Cheers, >>> Aljoscha >>> >>> On Thu, 21 Apr 2016 at 09:45 Igor Berman wrote: >>> Hi All, Is there such implementation somewhere?(before I start to implement it myself, it seems not too difficult based on SequenceFileWriter example) anyway any ideas/pointers will be highly appreciated thanks in advance >>
Re: AvroWriter for Rolling sink
Hi, the code looks very good! Do you think it can be adapted to the slightly modified interface introduced here: https://issues.apache.org/jira/browse/FLINK-3637 It basically requires the writer to know the write position, so that we can truncate to a valid position in case of failure. Cheers, Aljoscha On Thu, 21 Apr 2016 at 18:40 Igor Berman wrote: > ok, > I have working prototype already, if somebody is interested(attached) > > I might add it as PR latter(with tests etc) > > tested locally & with s3 > > > > > > > > On 21 April 2016 at 12:01, Aljoscha Krettek wrote: > >> Hi, >> as far as I know there is no one working on this. I'm only aware of >> someone working on an ORC (from Hive) Writer. >> >> This would be a welcome addition! I think you are already on the right >> track, the only thing required will probably be an AvroFileWriter and you >> already started looking at SequenceFileWriter, which should be similar. >> >> Cheers, >> Aljoscha >> >> On Thu, 21 Apr 2016 at 09:45 Igor Berman wrote: >> >>> Hi All, >>> Is there such implementation somewhere?(before I start to implement it >>> myself, it seems not too difficult based on SequenceFileWriter example) >>> >>> anyway any ideas/pointers will be highly appreciated >>> >>> thanks in advance >>> >>> >
Re: AvroWriter for Rolling sink
ok, I have working prototype already, if somebody is interested(attached) I might add it as PR latter(with tests etc) tested locally & with s3 On 21 April 2016 at 12:01, Aljoscha Krettek wrote: > Hi, > as far as I know there is no one working on this. I'm only aware of > someone working on an ORC (from Hive) Writer. > > This would be a welcome addition! I think you are already on the right > track, the only thing required will probably be an AvroFileWriter and you > already started looking at SequenceFileWriter, which should be similar. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 09:45 Igor Berman wrote: > >> Hi All, >> Is there such implementation somewhere?(before I start to implement it >> myself, it seems not too difficult based on SequenceFileWriter example) >> >> anyway any ideas/pointers will be highly appreciated >> >> thanks in advance >> >> package org.apache.flink.streaming.connectors.fs.avro; /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import java.io.IOException; import java.util.Map; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; import org.apache.avro.generic.GenericData; import org.apache.avro.hadoop.file.HadoopCodecFactory; import org.apache.avro.hadoop.io.AvroDatumConverter; import org.apache.avro.hadoop.io.AvroDatumConverterFactory; import org.apache.avro.hadoop.io.AvroSerialization; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyValueRecordWriter; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.mapred.JobConf; /** * Implementation of AvroKeyValue writer that can be used in Sink. * You'll need dependency(pay attention to classifier, it works only for hadoop2) {@code first thing to add avro mapred dependency org.apache.avro avro-mapred 1.7.6 hadoop2 } And then: {@code RollingSink , AvroValue>> sink = new RollingSink , AvroValue>>("/tmp/path"); sink.setBucketer(new DateTimeBucketer("-MM-dd-HH-mm")); Map properties = new HashMap<>(); Schema longSchema = Schema.create(Type.LONG); String keySchema = longSchema.toString(); properties.put("avro.schema.output.key", keySchema); String valueSchema = longSchema.toString(); properties.put("avro.schema.output.value", valueSchema); properties.put(FileOutputFormat.COMPRESS, Boolean.toString(true)); properties.put(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName()); sink.setWriter(new AvroSinkWriter , AvroValue>(properties)); sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB } to test with s3: {@code create core-site.xml(I haven't other way to test locally) fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem fs.s3a.access.key xxx fs.s3a.secret.key yyy fs.s3a.buffer.dir /tmp and add following dependencies(not sure what is best option here): org.apache.hadoop hadoop-aws 2.7.0 provided guava com.google.guava } */ public class AvroSinkWriter implements Writer>, InputTypeConfigurable { private static final long serialVersionUID = 1L; private transient FSDataOutputStream outputStream; private transient AvroKeyValueRecordWriter writer; private Class keyClass; private Class valueClass; private final Map properties; /** * C'tor for the writer * * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above) * @param properties */ public AvroSinkWriter(Map properties) { this.properties = properties; } private AvroSinkWriter(Class keyClass, Class valueClass, Map properties) { this.properties = properties; this.keyClass = keyClass; this.valueClass = valueClass; } //this is almost copy-paste from AvroOutputFormatBase.getCompressionCodec(..) private CodecFactory getCompressionCodec(JobConf
Re: AvroWriter for Rolling sink
Hi, as far as I know there is no one working on this. I'm only aware of someone working on an ORC (from Hive) Writer. This would be a welcome addition! I think you are already on the right track, the only thing required will probably be an AvroFileWriter and you already started looking at SequenceFileWriter, which should be similar. Cheers, Aljoscha On Thu, 21 Apr 2016 at 09:45 Igor Berman wrote: > Hi All, > Is there such implementation somewhere?(before I start to implement it > myself, it seems not too difficult based on SequenceFileWriter example) > > anyway any ideas/pointers will be highly appreciated > > thanks in advance > >
AvroWriter for Rolling sink
Hi All, Is there such implementation somewhere?(before I start to implement it myself, it seems not too difficult based on SequenceFileWriter example) anyway any ideas/pointers will be highly appreciated thanks in advance