Hi Anuj,

It's been a while since I wrote this (Flink 1.5.2). Could be a better/newer
way, but this is what how I read & write Parquet with hadoop-compatibility:

// imports
> import org.apache.avro.generic.GenericRecord;
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
>
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;

import org.apache.flink.hadoopcompatibility.HadoopInputs;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.parquet.avro.AvroParquetInputFormat;
>
> // Creating Parquet input format
> Configuration conf = new Configuration();
> Job job = Job.getInstance(conf);
> AvroParquetInputFormat<GenericRecord> parquetInputFormat = new
> AvroParquetInputFormat<>();
> AvroParquetInputFormat.setInputDirRecursive(job, true);
> AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
> HadoopInputFormat<Void, GenericRecord> inputFormat
> = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
> GenericRecord.class, job);
>


> // Creating Parquet output format
> AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new
> AvroParquetOutputFormat<>();
> AvroParquetOutputFormat.setSchema(job, new
> Schema.Parser().parse(SomeEvent.SCHEMA));
> AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
> AvroParquetOutputFormat.setCompressOutput(job, true);
> AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
> HadoopOutputFormat<Void, GenericRecord> outputFormat = new
> HadoopOutputFormat<>(parquetOutputFormat, job);



DataSource<Tuple2<Void, GenericRecord>> inputFileSource =
> env.createInput(inputFormat);



// Start processing...



// Writing result as Parquet
> resultDataSet.output(outputFormat);


Regarding writing partitioned data, as far as I know, there is no way to
achieve that with the DataSet API with hadoop-compatibility.

You could implement this with reading from input files as stream and then
using StreamingFileSink with a custom BucketAssigner [1].
The problem with that (which was not yet resolved AFAIK) is described here
[2] in "Important Notice 2".

Sadly I say, that eventually, for this use-case I chose Spark to do the
job...

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general

Hope this helps.

Rafi


On Sat, Feb 15, 2020 at 5:03 PM aj <ajainje...@gmail.com> wrote:

> Hi Rafi,
>
> I have a similar use case where I want to read parquet files in the
> dataset and want to perform some transformation and similarly want to write
> the result using year month day partitioned.
>
> I am stuck at first step only where how to read and write Parquet files
> using hadoop-Compatability.
>
> Please help me with this and also if u find the solution for how to write
> data in partitioned.
>
> Thanks,
> Anuj
>
>
> On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin <and...@data-artisans.com>
> wrote:
>
>> Hi Rafi,
>>
>> At the moment I do not see any support of Parquet in DataSet API
>> except HadoopOutputFormat, mentioned in stack overflow question. I have
>> cc’ed Fabian and Aljoscha, maybe they could provide more information.
>>
>> Best,
>> Andrey
>>
>> On 25 Oct 2018, at 13:08, Rafi Aroch <rafi.ar...@gmail.com> wrote:
>>
>> Hi,
>>
>> I'm writing a Batch job which reads Parquet, does some aggregations and
>> writes back as Parquet files.
>> I would like the output to be partitioned by year, month, day by event
>> time. Similarly to the functionality of the BucketingSink.
>>
>> I was able to achieve the reading/writing to/from Parquet by using the
>> hadoop-compatibility features.
>> I couldn't find a way to partition the data by year, month, day to create
>> a folder hierarchy accordingly. Everything is written to a single directory.
>>
>> I could find an unanswered question about this issue:
>> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
>>
>> Can anyone suggest a way to achieve this? Maybe there's a way to
>> integrate the BucketingSink with the DataSet API? Another solution?
>>
>> Rafi
>>
>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Reply via email to