Thank you Alexey for the review and great suggestions. -B
On Wed, Mar 17, 2021 at 12:07 PM Alexey Romanenko <[email protected]> wrote: > Thank you for your contribution, Bashir! > > Alexey > > On 17 Mar 2021, at 15:32, Bashir Sadjad <[email protected]> wrote: > > To close the loop here: The fix is merged and a configuration parameter is > added to ParquetIO.Sink now for rowGroupSize (which is not a great name > <https://github.com/apache/beam/pull/14227#discussion_r595409910>, BTW). > > Thanks > > -B > > On Fri, Mar 12, 2021 at 11:55 AM David Hollands <[email protected]> > wrote: > >> Tbh mate, I reckon it would be quicker if you progress your PR. >> >> Cheers, >> David >> >> ------------------------------ >> *From:* Bashir Sadjad <[email protected]> >> *Sent:* 12 March 2021 16:29 >> *To:* [email protected] <[email protected]> >> *Subject:* Re: Setting rowGroupSize in ParquetIO >> >> Thanks David. Yes, I looked at passing it through the HadoopConfiguration >> but it seems row-group size is not there or at least ParquetWriter.Builder >> seems to set that directly from its rowGroupSize property. I filed >> BEAM-11969 <https://issues.apache.org/jira/browse/BEAM-11969> for this >> so if you can contribute your patch for plumbing this, that would be great. >> Otherwise, I can send a PR. >> >> Regards >> >> -B >> >> On Fri, Mar 12, 2021 at 8:25 AM David Hollands <[email protected]> >> wrote: >> >> Hi Bashir, >> >> >> >> I think it is just a case of somebody bothering to plumbing it in >> explicitly, e.g. >> >> >> >> /** Specifies row group size. By default, DEFAULT_BLOCK_SIZE. */ >> >> >> >> public Sink withRowGroupSize(int rowGroupSize) { >> >> >> >> return toBuilder().setRowGroupSize(rowGroupSize).build(); >> >> } >> >> >> >> and >> >> >> >> this.writer = >> >> >> >> AvroParquetWriter.<GenericRecord>builder(beamParquetOutputFile) >> >> >> >> .withRowGroupSize(getRowGroupSize()) // Ze patch to set RowGroupSize >> >> >> >> .withSchema(schema) >> >> >> >> .withCompressionCodec(getCompressionCodec()) >> >> >> >> .withWriteMode(OVERWRITE) >> >> .build(); >> >> >> >> Etc. >> >> >> >> *However, it might worth exploring if it can be set via the >> HadoopConfiguration “parquet.block.size” property, but I’m not sure that it >> actually can.* >> >> >> >> We patched in something explicitly last year but didn’t contribute >> upstream as there was quite a bit of activity on the ParquetIO (e.g. >> conversion to SDF) at the time. >> >> >> >> The use case we had at the time was that some downstream consumers of the >> parquet (AWS S3 Select) couldn’t handle rowGroupSizes > 64MB uncompressed. >> I’m sure there are other use cases out there that need this fined grained >> control. >> >> >> >> Cheers, David >> >> >> >> *David Hollands* >> >> BBC Broadcast Centre, London, W12 >> >> Email: [email protected] >> >> >> >> >> >> *From: *Bashir Sadjad <[email protected]> >> *Reply to: *"[email protected]" <[email protected]> >> *Date: *Friday, 12 March 2021 at 07:58 >> *To: *"[email protected]" <[email protected]> >> *Subject: *Setting rowGroupSize in ParquetIO >> >> >> >> Hi all, >> >> >> >> I wonder how I can set the row group size for files generated by >> ParquetIO.Sink >> <https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/parquet/ParquetIO.Sink.html>. >> It doesn't seem to provide the option for setting that and IIUC from the >> code >> <https://github.com/apache/beam/blob/fffb85a35df6ae3bdb2934c077856f6b27559aa7/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L1117>, >> it >> uses the default value in ParquetWriter.Builder >> <https://github.com/apache/parquet-mr/blob/bdf935a43bd377c8052840a4328cf5b7603aa70a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java#L636> >> which >> is 128MB. Is there any reason not to expose this parameter in ParquetIO? >> >> >> >> Thanks >> >> >> >> -B >> >> >> >> >
