Thank for the mail Bruno !!

On Wed, Jan 18, 2017 at 1:10 AM, Bruno Aranda <brunoara...@gmail.com> wrote:

> Sorry, something went wrong with the code for the Writer. Here it is again:
>
> import org.apache.avro.Schema
> import org.apache.flink.streaming.connectors.fs.Writer
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.parquet.avro.AvroParquetWriter
> import org.apache.parquet.hadoop.ParquetWriter
> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>
> @SerialVersionUID(1L)
> class MyAvroParquetWriter[T](schema: String) extends Writer[T] {
>
>   @transient private var writer: ParquetWriter[T] = _
>
>   override def open(fs: FileSystem, path: Path): Unit = {
>     writer = AvroParquetWriter.builder[T](path)
>       .withSchema(new Schema.Parser().parse(schema))
>       .withCompressionCodec(CompressionCodecName.SNAPPY)
>       .build()
>   }
>
>   override def write(element: T): Unit = writer.write(element)
>
>   override def duplicate(): Writer[T] = new MyAvroParquetWriter[T](schema)
>
>   override def close(): Unit = writer.close()
>
>   override def getPos: Long = writer.getDataSize
>
>   override def flush(): Long = writer.getDataSize
>
> }
>
> Using this library as dependency: "org.apache.parquet" % "parquet-avro" %
> "1.8.1". We use this writer in a rolling sink and seems fine so far.
>
> Cheers,
>
> Bruno
>
> On Wed, 18 Jan 2017 at 09:09 elmosca <brunoara...@gmail.com> wrote:
>
>> Hi Biswajit,
>>
>> We use the following Writer for Parquet using Avro conversion (using
>> Scala):
>>
>>
>>
>> Using this library as dependency: "org.apache.parquet" % "parquet-avro" %
>> "1.8.1". We use this writer in a rolling sink and seems fine so far.
>>
>> Cheers,
>>
>> Bruno
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-
>> mailing-list-archive.2336050.n4.nabble.com/Rolling-sink-
>> parquet-Avro-output-tp11123p11127.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>

Reply via email to