Hi Alex,

Thanks for the suggestion. I tried like in the github example by infering AVRO 
schema,


 PCollection<String> input =
            pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
       input
        .apply("Produce Avro records", ParDo.of(new 
DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
        .apply(
            "Write Parquet files",
            
FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));

    pipeline.run();
  }


have 2 simple questions

  1.  how can i disable sharding with FileIO.write().  i want a single parquet 
file from a single CSV
  2.   how can i change the above to have a custom naming for my parquet file
  3.  do i have to use FileIO.writeDynamic() ?

Hi Lucas,

I am newbie so not there yet to solve BEAM jira's , but it will help immensly 
if AVRO scehma inference is avoided
some thing like python pandas/pyarrow does

thanks for your help
Sri

________________________________
From: Sridevi Nookala
Sent: Wednesday, January 23, 2019 9:41:02 PM
To: [email protected]
Subject: Re: ParquetIO write of CSV document data


Hi Alex,


Thanks for the suggestion. I tried like in the github example by infering AVRO 
schema,


 PCollection<String> input =
            pipeline.apply(TextIO.read().from("/tmp/beam/input.csv"));
       input
        .apply("Produce Avro records", ParDo.of(new 
DeterministicallyConstructAvroRecordsFn()))
.setCoder(AvroCoder.of(SCHEMA))
        .apply(
            "Write Parquet files",
            
FileIO.<GenericRecord>write().via(ParquetIO.sink(SCHEMA)).to("/tmp/parquet/"));

    pipeline.run();
  }


________________________________
From: Łukasz Gajowy <[email protected]>
Sent: Tuesday, January 15, 2019 7:02:56 AM
To: [email protected]
Subject: Re: ParquetIO write of CSV document data

Hi Sri,

it's exactly as Alexey says, although there are plans/ideas to improve 
ParquetIO in a way that would not require defining the schema manually.

Some Jiras that might be interesting in this topic but not yet resolved (maybe 
you are willing to contribute?):
https://issues.apache.org/jira/browse/BEAM-4454<https://issues.apache.org/jira/browse/BEAM-4454>
https://issues.apache.org/jira/browse/BEAM-4812<https://issues.apache.org/jira/browse/BEAM-4812>
https://issues.apache.org/jira/browse/BEAM-6394<https://issues.apache.org/jira/browse/BEAM-6394>

Thanks,
Łukasz

pon., 14 sty 2019 o 19:16 Alexey Romanenko 
<[email protected]<mailto:[email protected]>> napisał(a):
Hi Sri,

Afaik, you have to create “PCollection" of "GenericRecord”s and define your 
Avro schema manually to write your data into Parquet files.
In this case, you will need to create a ParDo for this translation. Also, I 
expect that your schema is the same for all CSV files.

Basic example of using Parquet Sink with Java SDK could be found here [1]

[1] https://git.io/fhcfV<https://git.io/fhcfV>


On 14 Jan 2019, at 02:00, Sridevi Nookala 
<[email protected]<mailto:[email protected]>> wrote:

hi,

I have a bunch of CSV data files that i need to store in Parquet format. I did 
look at basic documentation on ParquetIO. and ParquetIO.sink() can be used to 
achive the same.
However there is a dependency on the Avro Schema.
how do i infer/generate Avro schema from CSV document data ?
Does beam have any API for the same.
I tried using Kite SDK API CSVUtil / JsonUtil but had no luck generating avro 
schema
my CSV data files have headers in them and quite a few of the header fields are 
hyphenated which are not liked by Kite 's CSVUtil

I think it will be a redundant effort to convert CSV documents to json 
documents .
Any suggestions on how to infer avro schema from CSV data or a JSON schema will 
be helpful

thanks
Sri

Reply via email to