Hi Anuj, It's been a while since I wrote this (Flink 1.5.2). Could be a better/newer way, but this is what how I read & write Parquet with hadoop-compatibility:
// imports > import org.apache.avro.generic.GenericRecord; > import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; > import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; import org.apache.flink.hadoopcompatibility.HadoopInputs; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.mapreduce.Job; > import org.apache.parquet.avro.AvroParquetInputFormat; > > // Creating Parquet input format > Configuration conf = new Configuration(); > Job job = Job.getInstance(conf); > AvroParquetInputFormat<GenericRecord> parquetInputFormat = new > AvroParquetInputFormat<>(); > AvroParquetInputFormat.setInputDirRecursive(job, true); > AvroParquetInputFormat.setInputPaths(job, pathsToProcess); > HadoopInputFormat<Void, GenericRecord> inputFormat > = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class, > GenericRecord.class, job); > > // Creating Parquet output format > AvroParquetOutputFormat<GenericRecord> parquetOutputFormat = new > AvroParquetOutputFormat<>(); > AvroParquetOutputFormat.setSchema(job, new > Schema.Parser().parse(SomeEvent.SCHEMA)); > AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY); > AvroParquetOutputFormat.setCompressOutput(job, true); > AvroParquetOutputFormat.setOutputPath(job, new Path(pathString)); > HadoopOutputFormat<Void, GenericRecord> outputFormat = new > HadoopOutputFormat<>(parquetOutputFormat, job); DataSource<Tuple2<Void, GenericRecord>> inputFileSource = > env.createInput(inputFormat); // Start processing... // Writing result as Parquet > resultDataSet.output(outputFormat); Regarding writing partitioned data, as far as I know, there is no way to achieve that with the DataSet API with hadoop-compatibility. You could implement this with reading from input files as stream and then using StreamingFileSink with a custom BucketAssigner [1]. The problem with that (which was not yet resolved AFAIK) is described here [2] in "Important Notice 2". Sadly I say, that eventually, for this use-case I chose Spark to do the job... [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general Hope this helps. Rafi On Sat, Feb 15, 2020 at 5:03 PM aj <ajainje...@gmail.com> wrote: > Hi Rafi, > > I have a similar use case where I want to read parquet files in the > dataset and want to perform some transformation and similarly want to write > the result using year month day partitioned. > > I am stuck at first step only where how to read and write Parquet files > using hadoop-Compatability. > > Please help me with this and also if u find the solution for how to write > data in partitioned. > > Thanks, > Anuj > > > On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin <and...@data-artisans.com> > wrote: > >> Hi Rafi, >> >> At the moment I do not see any support of Parquet in DataSet API >> except HadoopOutputFormat, mentioned in stack overflow question. I have >> cc’ed Fabian and Aljoscha, maybe they could provide more information. >> >> Best, >> Andrey >> >> On 25 Oct 2018, at 13:08, Rafi Aroch <rafi.ar...@gmail.com> wrote: >> >> Hi, >> >> I'm writing a Batch job which reads Parquet, does some aggregations and >> writes back as Parquet files. >> I would like the output to be partitioned by year, month, day by event >> time. Similarly to the functionality of the BucketingSink. >> >> I was able to achieve the reading/writing to/from Parquet by using the >> hadoop-compatibility features. >> I couldn't find a way to partition the data by year, month, day to create >> a folder hierarchy accordingly. Everything is written to a single directory. >> >> I could find an unanswered question about this issue: >> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit >> >> Can anyone suggest a way to achieve this? Maybe there's a way to >> integrate the BucketingSink with the DataSet API? Another solution? >> >> Rafi >> >> >> > > -- > Thanks & Regards, > Anuj Jain > Mob. : +91- 8588817877 > Skype : anuj.jain07 > <http://www.oracle.com/> > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> >