Hi Csaba,
I deep appreciated your help and support !
Thanks so much, Now Its running correct and using the good practices.
I take advantage in this email , for ask another step,in this same pipeline.
The File csv will be generated to .avro,
And I have 2 differents schemas :
When came from FTP files with *02*.csv we have *schema_02.avsc*
*01*.csv = schema_01.avsc .
Each schema has different structure
Look my attempt bellow:
class FtpGcsDoFn extends DoFn<FtpInput, String> {
@ProcessElement
public void processElement(@Element FtpInput f, OutputReceiver<String>
outputReceiver, ProcessContext c) {
ArgsOptions option= c.getPipelineOptions().as(ArgsOptions.class);
....
String pathSavedFile = saveFile(download, file.getName(), option);
outputReceiver.output(pathSavedFile); // Save on GCP
option.setCurrentSchema(String.format("schema_%s.avsc",file.getName());
}
...
.apply("Ftp download", ParDo.of( FtpTransferFileFn.from(options
.getFtpHost()))
.apply("Read csv"
,TextIO.readAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
.apply("Read line", ParDo.of(new DoFn<String, String>(){
@ProcessElement
public void processElement(ProcessContext c) {
ArgsOptions opt= c.getPipelineOptions().as(ArgsOptions.
class);
String nameFileSchema=
opt.getCurrentSchema().get();
** This method Not working, return always the last option.setCurrentSchema(
...) in class FtpGcsDoFn
Schema schema = new
Schema.Parser().parse(schema);
String[] rowValues = c.element().split(","
);
// Create Avro Generic Record
GenericRecord genericRecord = new
GenericData.Record(schema);
for (int index = 0; index < fields.size(); ++index)
{
Schema.Field field = fields.get(index);
genericRecord.put(field.name(), rowValues[
index]);
}
c.output(genericRecord); // NO serialized
GenericRecord :'(
}
}))
.apply("Write Avro formatted data", AvroIO
.writeGenericRecords(SCHEMA_DINAMIC ?? ) // How I get different schema...
:`(
.to(options.getOutput()).withCodec(
CodecFactory.snappyCodec()).withSuffix(".avro"));
Thank & Regards
On Fri, Feb 1, 2019 at 6:53 AM Csaba Kassai <[email protected]> wrote:
> Hi,
>
> you can output the path where you saved the files on GCS in your first
> DoFn and use the TextIO.readAll() method.
> Also it is better to initialise the FTPClient in the @Setup method instead
> of every time you process and element in the @ProcessElement method.
> Something like this:
> PCollection<FtpInput> input = ...
> PCollection<String> fileTransfers = input.apply("Transfer FTP",
> ParDo.of(FtpToGcsDoFn.from(options.getFtpHost())))
> .apply("Read File",
> TextIO.readAll()) // This is not correct ...
> .apply("Read CSV LINES ", .....)
> .apply("Convert to AVRO".....)
> .apply("Save in AVRO",...)
>
> public class FtpToGcsDoFn extends DoFn<FtpInput, String> {
>
> private final ValueProvider<String> ftpHost;
> private FTPClient ftpClient;
>
> public static FtpToGcsDoFn from(ValueProvider<String> ftpHost) {
> return new FtpToGcsDoFn(ftpHost);
> }
>
> private FtpToGcsDoFn(ValueProvider<String> ftpHost) {
> this.ftpHost = ftpHost;
> }
>
> @Setup
> public void setup() {
> ftpClient = new FTPClient();
> ftpClient.connect(ftpHost.get());
>
> }
>
> @ProcessElement
> public void processElement(@Element FtpInput f, OutputReceiver<String>
> outputReceiver) {
>
> ByteArrayOutputStream download = new ByteArrayOutputStream();
> boolean result = ftpClient.retrieveFile(f.getName(), download);
> String destinationPath = saveCSV(download, f.getName()); // save
> CSV in Storage Google cloud
> outputReceiver.output(destinationPath);
> }
> }
>
> Regards,
> Csabi
>
>
>
> On Fri, 1 Feb 2019 at 08:55, Henrique Molina <[email protected]>
> wrote:
>
>> Dear all,
>> I Using FTP Client to download some file dynamically , and the file is
>> csv. ( it is working fine)
>> And the next step I need to open the files, and read lines
>>
>> Somebody could help me using the good practices in this approach ?
>> I using Java > Google DataFlow > apache beam 2.9.0
>>
>> PCollection<String> fileTransfers= pipeline.apply("Transfer FTP", new
>> DoFn<FtpInput, String>{
>>
>> @ProcessElement
>>
>> public void processElement(ProcessContext c) {
>>
>> ArgsOptions opt= c.getPipelineOptions().as(ArgsOptions.class);
>>
>> FTPClient ftp = new FTPClient();
>>
>> ftp.connect(opt.getFtpHost());
>>
>> ByteArrayOutputStream download = new
>> ByteArrayOutputStream();
>>
>> boolean result= ftp.retrieveFile(f.getName(),
>> download);
>>
>> saveCSV(download); // save CSV in Storage Google
>> cloud
>>
>> c.output("???");
>>
>> ...
>> })
>> .apply("Read File", TextIO.read().from("")); // This is not correct ...
>> .apply("Read CSV LINES ", .....);
>> .appply("Convert to AVRO".....) ;
>> .apply("Save in AVRO",...);
>>
>> What I found at Internet is samples using the easy way:
>> Start the pipeline with TextIO.read().from("hardcoded path") first.
>> But I can't find some example in my situations.
>> Someone already faced this challenge?
>>
>> Thanks in Advanced
>>
>>