Hi,
if you have only 2 schemas, the easiest way if you create two separate
PCollection from the different files and handle them separately in two
processing branch.
You can separate them in the FtpGcsDoFn using sideoutputs like this:
public class FtpToGcsDoFn extends DoFn<FtpToGcsDoFn.FtpInput, String> {
private final ValueProvider<String> ftpHost;
private FTPClient ftpClient;
private final TupleTag<String> schema01TT;
private final TupleTag<String> schema02TT;
public FtpToGcsDoFn(ValueProvider<String> ftpHost, TupleTag<String>
schema01TT, TupleTag<String> schema02TT) {
this.ftpHost = ftpHost;
this.schema01TT = schema01TT;
this.schema02TT = schema02TT;
}
....
@ProcessElement
public void processElement(@Element FtpInput f, MultiOutputReceiver
outputReceiver) {
ByteArrayOutputStream download = new ByteArrayOutputStream();
boolean result = ftpClient.retrieveFile(f.getName(), download);
String destinationPath = saveCSV(download, f.getName()); // save
CSV in Storage Google cloud
switch (f.getName()) {
case "01.csv":
outputReceiver.get(schema01TT).output(destinationPath);
break;
case "02.csv":
outputReceiver.get(schema02TT).output(destinationPath);
break;
default: throw new UnsupportedOperationException("Not supported
file: " + f.getName());
}
}
...
}
And use like this:
TupleTag<String> schema01TT = new TupleTag<>();
TupleTag<String> schema02TT = new TupleTag<>();
PCollectionTuple fileDestinationPC = input.apply("Transfer FTP",
ParDo.of(new FtpToGcsDoFn(null, schema01TT,
schema02TT)).withOutputTags(schema01TT, TupleTagList.of(schema02TT)));
PCollection<String> schema1Files = destionationPC.get(schema01TT);
PCollection<String> schema2Files = destionationPC.get(schema02TT);
And apply the same transformations for both of the PCollections using the
corresponding schema.
Regards,
Csabi
On Fri, 1 Feb 2019 at 21:30, Henrique Molina <[email protected]>
wrote:
> Hi Csaba,
> I deep appreciated your help and support !
> Thanks so much, Now Its running correct and using the good practices.
>
> I take advantage in this email , for ask another step,in this same
> pipeline.
> The File csv will be generated to .avro,
> And I have 2 differents schemas :
> When came from FTP files with *02*.csv we have *schema_02.avsc*
> *01*.csv = schema_01.avsc .
> Each schema has different structure
> Look my attempt bellow:
> class FtpGcsDoFn extends DoFn<FtpInput, String> {
>
> @ProcessElement
>
> public void processElement(@Element FtpInput f,
> OutputReceiver<String> outputReceiver, ProcessContext c) {
>
> ArgsOptions option= c.getPipelineOptions().as(ArgsOptions.class);
>
> ....
>
> String pathSavedFile = saveFile(download, file.getName(), option);
>
> outputReceiver.output(pathSavedFile); // Save on GCP
>
> option.setCurrentSchema(String.format("schema_%s.avsc",file.getName());
>
> }
>
>
> ...
>
>
> .apply("Ftp download", ParDo.of( FtpTransferFileFn.from(options
> .getFtpHost()))
>
> .apply("Read csv"
> ,TextIO.readAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
>
> .apply("Read line", ParDo.of(new DoFn<String,
> String>(){
>
> @ProcessElement
>
> public void processElement(ProcessContext c) {
>
> ArgsOptions opt= c
> .getPipelineOptions().as(ArgsOptions.class);
>
> String nameFileSchema=
> opt.getCurrentSchema().get();
> ** This method Not working, return always the last option.setCurrentSchema(
> ...) in class FtpGcsDoFn
>
> Schema schema = new
> Schema.Parser().parse(schema);
>
> String[] rowValues = c.element().split(
> ",");
>
> // Create Avro Generic Record
>
> GenericRecord genericRecord = new
> GenericData.Record(schema);
>
> for (int index = 0; index < fields.size(); ++
> index) {
>
> Schema.Field field = fields.get(index);
>
> genericRecord.put(field.name(), rowValues[
> index]);
>
> }
>
> c.output(genericRecord); // NO serialized
> GenericRecord :'(
>
> }
>
> }))
>
> .apply("Write Avro formatted data", AvroIO
> .writeGenericRecords(SCHEMA_DINAMIC ?? ) // How I get different
> schema... :`(
>
> .to(options.getOutput()).withCodec(
> CodecFactory.snappyCodec()).withSuffix(".avro"));
>
>
> Thank & Regards
>
> On Fri, Feb 1, 2019 at 6:53 AM Csaba Kassai <[email protected]> wrote:
>
>> Hi,
>>
>> you can output the path where you saved the files on GCS in your first
>> DoFn and use the TextIO.readAll() method.
>> Also it is better to initialise the FTPClient in the @Setup method
>> instead of every time you process and element in the @ProcessElement
>> method.
>> Something like this:
>> PCollection<FtpInput> input = ...
>> PCollection<String> fileTransfers = input.apply("Transfer FTP",
>> ParDo.of(FtpToGcsDoFn.from(options.getFtpHost())))
>> .apply("Read File",
>> TextIO.readAll()) // This is not correct ...
>> .apply("Read CSV LINES ", .....)
>> .apply("Convert to AVRO".....)
>> .apply("Save in AVRO",...)
>>
>> public class FtpToGcsDoFn extends DoFn<FtpInput, String> {
>>
>> private final ValueProvider<String> ftpHost;
>> private FTPClient ftpClient;
>>
>> public static FtpToGcsDoFn from(ValueProvider<String> ftpHost) {
>> return new FtpToGcsDoFn(ftpHost);
>> }
>>
>> private FtpToGcsDoFn(ValueProvider<String> ftpHost) {
>> this.ftpHost = ftpHost;
>> }
>>
>> @Setup
>> public void setup() {
>> ftpClient = new FTPClient();
>> ftpClient.connect(ftpHost.get());
>>
>> }
>>
>> @ProcessElement
>> public void processElement(@Element FtpInput f,
>> OutputReceiver<String> outputReceiver) {
>>
>> ByteArrayOutputStream download = new ByteArrayOutputStream();
>> boolean result = ftpClient.retrieveFile(f.getName(), download);
>> String destinationPath = saveCSV(download, f.getName()); // save
>> CSV in Storage Google cloud
>> outputReceiver.output(destinationPath);
>> }
>> }
>>
>> Regards,
>> Csabi
>>
>>
>>
>> On Fri, 1 Feb 2019 at 08:55, Henrique Molina <[email protected]>
>> wrote:
>>
>>> Dear all,
>>> I Using FTP Client to download some file dynamically , and the file is
>>> csv. ( it is working fine)
>>> And the next step I need to open the files, and read lines
>>>
>>> Somebody could help me using the good practices in this approach ?
>>> I using Java > Google DataFlow > apache beam 2.9.0
>>>
>>> PCollection<String> fileTransfers= pipeline.apply("Transfer FTP", new
>>> DoFn<FtpInput, String>{
>>>
>>> @ProcessElement
>>>
>>> public void processElement(ProcessContext c) {
>>>
>>> ArgsOptions opt= c.getPipelineOptions().as(ArgsOptions.class);
>>>
>>> FTPClient ftp = new FTPClient();
>>>
>>> ftp.connect(opt.getFtpHost());
>>>
>>> ByteArrayOutputStream download = new
>>> ByteArrayOutputStream();
>>>
>>> boolean result= ftp.retrieveFile(f.getName(),
>>> download);
>>>
>>> saveCSV(download); // save CSV in Storage
>>> Google cloud
>>>
>>> c.output("???");
>>>
>>> ...
>>> })
>>> .apply("Read File", TextIO.read().from("")); // This is not correct ...
>>> .apply("Read CSV LINES ", .....);
>>> .appply("Convert to AVRO".....) ;
>>> .apply("Save in AVRO",...);
>>>
>>> What I found at Internet is samples using the easy way:
>>> Start the pipeline with TextIO.read().from("hardcoded path") first.
>>> But I can't find some example in my situations.
>>> Someone already faced this challenge?
>>>
>>> Thanks in Advanced
>>>
>>>