hi alex/Jeff
Yes. It did help. however there are other issues that i am running into the producer of Parquet files is a different source in my case ( field names are having the infamous '-' and '.' in them . TELCO /3GPP world uses them. They were generated using python pandas/pyarrow which did not care /complain) Now i have issues building avro schema and i don't know if there is way to circumvent this and disable field validation etc. Eventually i have the need to read multiple parquet files, group the parquet based on some criteria, expand and combine them as one big parquet on a daily basis the source provided 15 min parquet chunks Any suggestions here will be helpful thanks Sri ________________________________ From: Alexey Romanenko <[email protected]> Sent: Friday, January 25, 2019 10:31:37 AM To: [email protected] Subject: Re: ParquetIO write of CSV document data Hi Great that this example helped you. Also, as I can see, Jeff Klukas already answered your questions in the other thread “How to disable sharding with FileIO.write()/FileIO.writeDynamic”. On 24 Jan 2019, at 03:44, Sridevi Nookala <[email protected]<mailto:[email protected]>> wrote: Hi Alex, Thanks for the suggestion. I tried like in the github example by infering AVRO schema, PCollection<String> input = pipeline.apply(TextIO.read().from("/tmp/beam/input.csv")); input .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn())) .setCoder(AvroCoder.of(SCHEMA)) .apply( "Write Parquet files", FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/")); pipeline.run(); } have 2 simple questions 1. how can i disable sharding with FileIO.write(). i want a single parquet file from a single CSV 2. how can i change the above to have a custom naming for my parquet file 3. do i have to use FileIO.writeDynamic() ? Hi Lucas, I am newbie so not there yet to solve BEAM jira's , but it will help immensly if AVRO scehma inference is avoided some thing like python pandas/pyarrow does thanks for your help Sri ________________________________ From: Sridevi Nookala Sent: Wednesday, January 23, 2019 9:41:02 PM To: [email protected]<mailto:[email protected]> Subject: Re: ParquetIO write of CSV document data Hi Alex, Thanks for the suggestion. I tried like in the github example by infering AVRO schema, PCollection<String> input = pipeline.apply(TextIO.read().from("/tmp/beam/input.csv")); input .apply("Produce Avro records", ParDo.of(new DeterministicallyConstructAvroRecordsFn())) .setCoder(AvroCoder.of(SCHEMA)) .apply( "Write Parquet files", FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/")); pipeline.run(); } ________________________________ From: Łukasz Gajowy <[email protected]<mailto:[email protected]>> Sent: Tuesday, January 15, 2019 7:02:56 AM To: [email protected]<mailto:[email protected]> Subject: Re: ParquetIO write of CSV document data Hi Sri, it's exactly as Alexey says, although there are plans/ideas to improve ParquetIO in a way that would not require defining the schema manually. Some Jiras that might be interesting in this topic but not yet resolved (maybe you are willing to contribute?): https://issues.apache.org/jira/browse/BEAM-4454<https://issues.apache.org/jira/browse/BEAM-4454> https://issues.apache.org/jira/browse/BEAM-4812<https://issues.apache.org/jira/browse/BEAM-4812> https://issues.apache.org/jira/browse/BEAM-6394<https://issues.apache.org/jira/browse/BEAM-6394> Thanks, Łukasz pon., 14 sty 2019 o 19:16 Alexey Romanenko <[email protected]<mailto:[email protected]>> napisał(a): Hi Sri, Afaik, you have to create “PCollection" of "GenericRecord”s and define your Avro schema manually to write your data into Parquet files. In this case, you will need to create a ParDo for this translation. Also, I expect that your schema is the same for all CSV files. Basic example of using Parquet Sink with Java SDK could be found here [1] [1] https://git.io/fhcfV<https://git.io/fhcfV> On 14 Jan 2019, at 02:00, Sridevi Nookala <[email protected]<mailto:[email protected]>> wrote: hi, I have a bunch of CSV data files that i need to store in Parquet format. I did look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to achive the same. However there is a dependency on the Avro Schema. how do i infer/generate Avro schema from CSV document data ? Does beam have any API for the same. I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro schema my CSV data files have headers in them and quite a few of the header fields are hyphenated which are not liked by Kite 's CSVUtil I think it will be a redundant effort to convert CSV documents to json documents . Any suggestions on how to infer avro schema from CSV data or a JSON schema will be helpful thanks Sri
