Hi Great that this example helped you. Also, as I can see, Jeff Klukas already answered your questions in the other thread “How to disable sharding with FileIO.write()/FileIO.writeDynamic”.
> On 24 Jan 2019, at 03:44, Sridevi Nookala <snook...@parallelwireless.com> > wrote: > > Hi Alex, > > Thanks for the suggestion. I tried like in the github example by infering > AVRO schema, > > PCollection<String> input = > pipeline.apply(TextIO.read().from("/tmp/beam/input.csv")); > input > .apply("Produce Avro records", ParDo.of(new > DeterministicallyConstructAvroRecordsFn())) > .setCoder(AvroCoder.of(SCHEMA)) > .apply( > "Write Parquet files", > > FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/")); > > pipeline.run(); > } > > have 2 simple questions > how can i disable sharding with FileIO.write(). i want a single parquet file > from a single CSV > how can i change the above to have a custom naming for my parquet file > do i have to use FileIO.writeDynamic() ? > > Hi Lucas, > > I am newbie so not there yet to solve BEAM jira's , but it will help immensly > if AVRO scehma inference is avoided > some thing like python pandas/pyarrow does > > thanks for your help > Sri > > From: Sridevi Nookala > Sent: Wednesday, January 23, 2019 9:41:02 PM > To: user@beam.apache.org > Subject: Re: ParquetIO write of CSV document data > > Hi Alex, > > Thanks for the suggestion. I tried like in the github example by infering > AVRO schema, > > PCollection<String> input = > pipeline.apply(TextIO.read().from("/tmp/beam/input.csv")); > input > .apply("Produce Avro records", ParDo.of(new > DeterministicallyConstructAvroRecordsFn())) > .setCoder(AvroCoder.of(SCHEMA)) > .apply( > "Write Parquet files", > > FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/")); > > pipeline.run(); > } > > From: Łukasz Gajowy <lukasz.gaj...@gmail.com> > Sent: Tuesday, January 15, 2019 7:02:56 AM > To: user@beam.apache.org > Subject: Re: ParquetIO write of CSV document data > > Hi Sri, > > it's exactly as Alexey says, although there are plans/ideas to improve > ParquetIO in a way that would not require defining the schema manually. > > Some Jiras that might be interesting in this topic but not yet resolved > (maybe you are willing to contribute?): > https://issues.apache.org/jira/browse/BEAM-4454 > <https://issues.apache.org/jira/browse/BEAM-4454> > https://issues.apache.org/jira/browse/BEAM-4812 > <https://issues.apache.org/jira/browse/BEAM-4812> > https://issues.apache.org/jira/browse/BEAM-6394 > <https://issues.apache.org/jira/browse/BEAM-6394> > > Thanks, > Łukasz > > pon., 14 sty 2019 o 19:16 Alexey Romanenko <aromanenko....@gmail.com > <mailto:aromanenko....@gmail.com>> napisał(a): > Hi Sri, > > Afaik, you have to create “PCollection" of "GenericRecord”s and define your > Avro schema manually to write your data into Parquet files. > In this case, you will need to create a ParDo for this translation. Also, I > expect that your schema is the same for all CSV files. > > Basic example of using Parquet Sink with Java SDK could be found here [1] > > [1] https://git.io/fhcfV <https://git.io/fhcfV> > > >> On 14 Jan 2019, at 02:00, Sridevi Nookala <snook...@parallelwireless.com >> <mailto:snook...@parallelwireless.com>> wrote: >> >> hi, >> >> I have a bunch of CSV data files that i need to store in Parquet format. I >> did look at basic documentation on ParquetIO. and ParquetIO.sink() can be >> used to achive the same. >> However there is a dependency on the Avro Schema. >> how do i infer/generate Avro schema from CSV document data ? >> Does beam have any API for the same. >> I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating >> avro schema >> my CSV data files have headers in them and quite a few of the header fields >> are hyphenated which are not liked by Kite 's CSVUtil >> >> I think it will be a redundant effort to convert CSV documents to json >> documents . >> Any suggestions on how to infer avro schema from CSV data or a JSON schema >> will be helpful >> >> thanks >> Sri