Re: AvroWriter for Rolling sink

2016-04-30 Thread Igor Berman
[FLINK-3854] Support Avro key-value rolling sink writer #1953

On 27 April 2016 at 19:29, Igor Berman  wrote:

> Hi Aljoscha,
>
> avro-mapred jar contains different M/R output formats for avro, and their
> writers
> it's primary used in M/R jobs that produce avro output
> see some details here : https://avro.apache.org/docs/1.7.6/mr.html
>
> I have extracted(kind of copy-pasted+adjustments) some of the classes from
> there to remove this dependency and it's seems to work with basic scenario
> I still want to write it as if it was created with M/R job to be
> compatible with this library(e.g. key-value pairs are wrapped into
> AvroKeyValue object) so that it's not important if Flink or regular M/R
> created this files, still their consumer can read them in a same way
> WDYT?
>
>
>
>
>
> On 27 April 2016 at 11:27, Aljoscha Krettek  wrote:
>
>> Hi,
>> which code did you reuse from there? I asked Robert and I think it is
>> somewhat problematic to add these somewhat bigger dependencies.
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 25 Apr 2016 at 21:24 Igor Berman  wrote:
>>
>>> Hi,
>>> it's not a problem, I'll find time to change it(I understand the
>>> refactoring is in master and not released yet).
>>> Wanted to ask if it's acceptable to add following dependency to flink?
>>> I mean my code reused code in this jar(pay attention it's not present
>>> currently in flink classpath)
>>> 
>>> org.apache.avro
>>> avro-mapred
>>> 1.7.6
>>> hadoop2
>>> 
>>>
>>> On 25 April 2016 at 16:20, Aljoscha Krettek  wrote:
>>>
 Hi,
 the code looks very good! Do you think it can be adapted to the
 slightly modified interface introduced here:
 https://issues.apache.org/jira/browse/FLINK-3637

 It basically requires the writer to know the write position, so that we
 can truncate to a valid position in case of failure.

 Cheers,
 Aljoscha

 On Thu, 21 Apr 2016 at 18:40 Igor Berman  wrote:

> ok,
> I have working prototype already, if somebody is interested(attached)
>
> I might add it as PR latter(with tests etc)
>
> tested locally & with s3
>
>
>
>
>
>
>
> On 21 April 2016 at 12:01, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> as far as I know there is no one working on this. I'm only aware of
>> someone working on an ORC (from Hive) Writer.
>>
>> This would be a welcome addition! I think you are already on the
>> right track, the only thing required will probably be an AvroFileWriter 
>> and
>> you already started looking at SequenceFileWriter, which should be 
>> similar.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 09:45 Igor Berman 
>> wrote:
>>
>>> Hi All,
>>> Is there such implementation somewhere?(before I start to implement
>>> it myself, it seems not too difficult based on SequenceFileWriter 
>>> example)
>>>
>>> anyway any ideas/pointers will be highly appreciated
>>>
>>> thanks in advance
>>>
>>>
>
>>>
>


Re: AvroWriter for Rolling sink

2016-04-27 Thread Igor Berman
Hi Aljoscha,

avro-mapred jar contains different M/R output formats for avro, and their
writers
it's primary used in M/R jobs that produce avro output
see some details here : https://avro.apache.org/docs/1.7.6/mr.html

I have extracted(kind of copy-pasted+adjustments) some of the classes from
there to remove this dependency and it's seems to work with basic scenario
I still want to write it as if it was created with M/R job to be compatible
with this library(e.g. key-value pairs are wrapped into AvroKeyValue
object) so that it's not important if Flink or regular M/R created this
files, still their consumer can read them in a same way
WDYT?





On 27 April 2016 at 11:27, Aljoscha Krettek  wrote:

> Hi,
> which code did you reuse from there? I asked Robert and I think it is
> somewhat problematic to add these somewhat bigger dependencies.
>
> Cheers,
> Aljoscha
>
> On Mon, 25 Apr 2016 at 21:24 Igor Berman  wrote:
>
>> Hi,
>> it's not a problem, I'll find time to change it(I understand the
>> refactoring is in master and not released yet).
>> Wanted to ask if it's acceptable to add following dependency to flink?
>> I mean my code reused code in this jar(pay attention it's not present
>> currently in flink classpath)
>> 
>> org.apache.avro
>> avro-mapred
>> 1.7.6
>> hadoop2
>> 
>>
>> On 25 April 2016 at 16:20, Aljoscha Krettek  wrote:
>>
>>> Hi,
>>> the code looks very good! Do you think it can be adapted to the slightly
>>> modified interface introduced here:
>>> https://issues.apache.org/jira/browse/FLINK-3637
>>>
>>> It basically requires the writer to know the write position, so that we
>>> can truncate to a valid position in case of failure.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 18:40 Igor Berman  wrote:
>>>
 ok,
 I have working prototype already, if somebody is interested(attached)

 I might add it as PR latter(with tests etc)

 tested locally & with s3







 On 21 April 2016 at 12:01, Aljoscha Krettek 
 wrote:

> Hi,
> as far as I know there is no one working on this. I'm only aware of
> someone working on an ORC (from Hive) Writer.
>
> This would be a welcome addition! I think you are already on the right
> track, the only thing required will probably be an AvroFileWriter and you
> already started looking at SequenceFileWriter, which should be similar.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 09:45 Igor Berman 
> wrote:
>
>> Hi All,
>> Is there such implementation somewhere?(before I start to implement
>> it myself, it seems not too difficult based on SequenceFileWriter 
>> example)
>>
>> anyway any ideas/pointers will be highly appreciated
>>
>> thanks in advance
>>
>>

>>


Re: AvroWriter for Rolling sink

2016-04-27 Thread Aljoscha Krettek
Hi,
which code did you reuse from there? I asked Robert and I think it is
somewhat problematic to add these somewhat bigger dependencies.

Cheers,
Aljoscha

On Mon, 25 Apr 2016 at 21:24 Igor Berman  wrote:

> Hi,
> it's not a problem, I'll find time to change it(I understand the
> refactoring is in master and not released yet).
> Wanted to ask if it's acceptable to add following dependency to flink?
> I mean my code reused code in this jar(pay attention it's not present
> currently in flink classpath)
> 
> org.apache.avro
> avro-mapred
> 1.7.6
> hadoop2
> 
>
> On 25 April 2016 at 16:20, Aljoscha Krettek  wrote:
>
>> Hi,
>> the code looks very good! Do you think it can be adapted to the slightly
>> modified interface introduced here:
>> https://issues.apache.org/jira/browse/FLINK-3637
>>
>> It basically requires the writer to know the write position, so that we
>> can truncate to a valid position in case of failure.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 18:40 Igor Berman  wrote:
>>
>>> ok,
>>> I have working prototype already, if somebody is interested(attached)
>>>
>>> I might add it as PR latter(with tests etc)
>>>
>>> tested locally & with s3
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 21 April 2016 at 12:01, Aljoscha Krettek  wrote:
>>>
 Hi,
 as far as I know there is no one working on this. I'm only aware of
 someone working on an ORC (from Hive) Writer.

 This would be a welcome addition! I think you are already on the right
 track, the only thing required will probably be an AvroFileWriter and you
 already started looking at SequenceFileWriter, which should be similar.

 Cheers,
 Aljoscha

 On Thu, 21 Apr 2016 at 09:45 Igor Berman  wrote:

> Hi All,
> Is there such implementation somewhere?(before I start to implement it
> myself, it seems not too difficult based on SequenceFileWriter example)
>
> anyway any ideas/pointers will be highly appreciated
>
> thanks in advance
>
>
>>>
>


Re: AvroWriter for Rolling sink

2016-04-25 Thread Igor Berman
Hi,
it's not a problem, I'll find time to change it(I understand the
refactoring is in master and not released yet).
Wanted to ask if it's acceptable to add following dependency to flink?
I mean my code reused code in this jar(pay attention it's not present
currently in flink classpath)

org.apache.avro
avro-mapred
1.7.6
hadoop2


On 25 April 2016 at 16:20, Aljoscha Krettek  wrote:

> Hi,
> the code looks very good! Do you think it can be adapted to the slightly
> modified interface introduced here:
> https://issues.apache.org/jira/browse/FLINK-3637
>
> It basically requires the writer to know the write position, so that we
> can truncate to a valid position in case of failure.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 18:40 Igor Berman  wrote:
>
>> ok,
>> I have working prototype already, if somebody is interested(attached)
>>
>> I might add it as PR latter(with tests etc)
>>
>> tested locally & with s3
>>
>>
>>
>>
>>
>>
>>
>> On 21 April 2016 at 12:01, Aljoscha Krettek  wrote:
>>
>>> Hi,
>>> as far as I know there is no one working on this. I'm only aware of
>>> someone working on an ORC (from Hive) Writer.
>>>
>>> This would be a welcome addition! I think you are already on the right
>>> track, the only thing required will probably be an AvroFileWriter and you
>>> already started looking at SequenceFileWriter, which should be similar.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 09:45 Igor Berman  wrote:
>>>
 Hi All,
 Is there such implementation somewhere?(before I start to implement it
 myself, it seems not too difficult based on SequenceFileWriter example)

 anyway any ideas/pointers will be highly appreciated

 thanks in advance


>>


Re: AvroWriter for Rolling sink

2016-04-25 Thread Aljoscha Krettek
Hi,
the code looks very good! Do you think it can be adapted to the slightly
modified interface introduced here:
https://issues.apache.org/jira/browse/FLINK-3637

It basically requires the writer to know the write position, so that we can
truncate to a valid position in case of failure.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 18:40 Igor Berman  wrote:

> ok,
> I have working prototype already, if somebody is interested(attached)
>
> I might add it as PR latter(with tests etc)
>
> tested locally & with s3
>
>
>
>
>
>
>
> On 21 April 2016 at 12:01, Aljoscha Krettek  wrote:
>
>> Hi,
>> as far as I know there is no one working on this. I'm only aware of
>> someone working on an ORC (from Hive) Writer.
>>
>> This would be a welcome addition! I think you are already on the right
>> track, the only thing required will probably be an AvroFileWriter and you
>> already started looking at SequenceFileWriter, which should be similar.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 21 Apr 2016 at 09:45 Igor Berman  wrote:
>>
>>> Hi All,
>>> Is there such implementation somewhere?(before I start to implement it
>>> myself, it seems not too difficult based on SequenceFileWriter example)
>>>
>>> anyway any ideas/pointers will be highly appreciated
>>>
>>> thanks in advance
>>>
>>>
>


Re: AvroWriter for Rolling sink

2016-04-21 Thread Igor Berman
ok,
I have working prototype already, if somebody is interested(attached)

I might add it as PR latter(with tests etc)

tested locally & with s3







On 21 April 2016 at 12:01, Aljoscha Krettek  wrote:

> Hi,
> as far as I know there is no one working on this. I'm only aware of
> someone working on an ORC (from Hive) Writer.
>
> This would be a welcome addition! I think you are already on the right
> track, the only thing required will probably be an AvroFileWriter and you
> already started looking at SequenceFileWriter, which should be similar.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 09:45 Igor Berman  wrote:
>
>> Hi All,
>> Is there such implementation somewhere?(before I start to implement it
>> myself, it seems not too difficult based on SequenceFileWriter example)
>>
>> anyway any ideas/pointers will be highly appreciated
>>
>> thanks in advance
>>
>>
package org.apache.flink.streaming.connectors.fs.avro;

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.util.Map;

import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.generic.GenericData;
import org.apache.avro.hadoop.file.HadoopCodecFactory;
import org.apache.avro.hadoop.io.AvroDatumConverter;
import org.apache.avro.hadoop.io.AvroDatumConverterFactory;
import org.apache.avro.hadoop.io.AvroSerialization;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyValueRecordWriter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapred.JobConf;

/**
 * Implementation of AvroKeyValue writer that can be used in Sink.
 * 
 You'll need dependency(pay attention to classifier, it works only for hadoop2)
 
 {@code
 
 first thing to add avro mapred dependency
	
		org.apache.avro
		avro-mapred
		1.7.6
		hadoop2
	
}
		
 And then:
  
 {@code
 RollingSink , AvroValue>> sink = new RollingSink , AvroValue>>("/tmp/path");
 sink.setBucketer(new DateTimeBucketer("-MM-dd-HH-mm"));
 Map properties = new HashMap<>();
 Schema longSchema = Schema.create(Type.LONG);
 String keySchema = longSchema.toString();
 properties.put("avro.schema.output.key", keySchema);
 String valueSchema = longSchema.toString();
 properties.put("avro.schema.output.value", valueSchema);
 properties.put(FileOutputFormat.COMPRESS, Boolean.toString(true));
 properties.put(FileOutputFormat.COMPRESS_CODEC, SnappyCodec.class.getName());

 sink.setWriter(new AvroSinkWriter , AvroValue>(properties));
 sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB
 }
 
 
 to test with s3:

{@code
	create core-site.xml(I haven't other way to test locally)	

	
	  fs.s3.impl
	  org.apache.hadoop.fs.s3a.S3AFileSystem
	
	
	  fs.s3a.access.key
	  xxx
	
	
	
	  fs.s3a.secret.key
	  yyy
	
	
	
		
		fs.s3a.buffer.dir
		/tmp
	



and add following dependencies(not sure what is best option here):
		
			org.apache.hadoop
			hadoop-aws
			2.7.0
			provided
			

	guava
	com.google.guava

			
		

 }
 
 */
public class AvroSinkWriter implements Writer>, InputTypeConfigurable {
	private static final long serialVersionUID = 1L;

	private transient FSDataOutputStream outputStream;

	private transient AvroKeyValueRecordWriter writer;

	private Class keyClass;

	private Class valueClass;

	private final Map properties;

	/**
	 * C'tor for the writer
	 * 
	 * You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
	 * @param properties
	 */
	public AvroSinkWriter(Map properties) {
		this.properties = properties;
	}

	private AvroSinkWriter(Class keyClass, Class valueClass, Map properties) {
		this.properties = properties;
		this.keyClass = keyClass;
		this.valueClass = valueClass;
	}

	//this is almost copy-paste from AvroOutputFormatBase.getCompressionCodec(..)
	private CodecFactory getCompressionCodec(JobConf 

Re: AvroWriter for Rolling sink

2016-04-21 Thread Aljoscha Krettek
Hi,
as far as I know there is no one working on this. I'm only aware of someone
working on an ORC (from Hive) Writer.

This would be a welcome addition! I think you are already on the right
track, the only thing required will probably be an AvroFileWriter and you
already started looking at SequenceFileWriter, which should be similar.

Cheers,
Aljoscha

On Thu, 21 Apr 2016 at 09:45 Igor Berman  wrote:

> Hi All,
> Is there such implementation somewhere?(before I start to implement it
> myself, it seems not too difficult based on SequenceFileWriter example)
>
> anyway any ideas/pointers will be highly appreciated
>
> thanks in advance
>
>


AvroWriter for Rolling sink

2016-04-21 Thread Igor Berman
Hi All,
Is there such implementation somewhere?(before I start to implement it
myself, it seems not too difficult based on SequenceFileWriter example)

anyway any ideas/pointers will be highly appreciated

thanks in advance