Thank you Priyanka. One suggestion needed from data torrent team, In our use case we need to read around 120 directories in parallel, we would like to keep operator memory(with container local) as lower as possible to reduce the burden on the cluster. As long as cluster resources are sufficient we can run the DT application continuously with pre-defined scan interval.
My concern is, can we run DT application in batch mode with external tools like oozie (we want to start and stop the application at predefined time in a day, instead of making it run continuously). This we would want it to reduce the burden on the cluster on peak hours. I heard that DT application will release the memory by default when not in use so we don’t need to worry when application is not streaming. Please through some light on this. Regards, Surya Vamshi From: Priyanka Gugale [mailto:[email protected]] Sent: 2016, July, 11 7:11 AM To: [email protected] Subject: Re: Inputs needed on File Writer Hi, Check app: https://github.com/apache/apex-malhar/tree/master/apps/filecopy This is for HDFS to HDFS copy but I could use same app to copy from HDFS to FTP as HDFS api supports ftp as well. Please note following property I used to run the app: <property> <name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name> <value>ftp://ftpadmin:ftpadmin@localhost:21/home/ftp/ftpadmin/out</value> </property> -Priyanka On Sat, Jul 9, 2016 at 12:33 PM, Priyanka Gugale <[email protected]<mailto:[email protected]>> wrote: I m traveling over weekends, would get back on Monday. -Priyanka On Jul 8, 2016 8:21 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <[email protected]<mailto:[email protected]>> wrote: Thank you Priyanka. Do you have any example that uses this Operator for FTP? Regards, Surya Vamshi From: Priyanka Gugale [mailto:[email protected]<mailto:[email protected]>] Sent: 2016, July, 08 10:48 AM To: [email protected]<mailto:[email protected]> Subject: RE: Inputs needed on File Writer Yes, ftp is supported but not sftp. -Priyanka On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <[email protected]<mailto:[email protected]>> wrote: Hi Priyanka, Thank you for your inputs. It may be dumb question, I heard from data torrent that SFTP is not supported for now in my previous communications.That means FTP is supported and SFTP is not supported ? please clarify the difference. Regards, Surya Vamshi From: Priyanka Gugale [mailto:[email protected]<mailto:[email protected]>] Sent: 2016, July, 08 12:07 AM To: [email protected]<mailto:[email protected]> Subject: Re: Inputs needed on File Writer Hi, The file will be available after window is committed, you can overwrite committed call and start your thread after super.commit is called. You might want to double check if file is actually finalized before starting your thread.. For your usecase I would suggest you to use AbstractFileOutputOperator to directly write file to ftp. -Priyanka On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <[email protected]<mailto:[email protected]>> wrote: Hi , Can you please let me know what happen when the requestFinalize() method is called as per below ? Once the output files are written to HDFS, I would like to initiate a thread that reads the HDFS files and copies to FTP location. So I am trying to understand when can I trigger the thread. ####################### File Writer ########################################## package com.rbc.aml.cnscan.operator; import com.datatorrent.api.Context; import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; import com.rbc.aml.cnscan.utils.KeyValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class FileWriter extends AbstractFileOutputOperator<KeyValue<String, String>> { private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class); private List<String> filesToFinalize = new ArrayList<>(); @Override public void setup(Context.OperatorContext context) { super.setup(context); finalizeFiles(); } @Override protected byte[] getBytesForTuple(KeyValue<String, String> tuple) { if (tuple.value == null) { LOG.debug("File to finalize {}",tuple.key); filesToFinalize.add(tuple.key); return new byte[0]; } else { return tuple.value.getBytes(); } } @Override protected String getFileName(KeyValue<String, String> tuple) { return tuple.key; } @Override public void endWindow() { LOG.info("end window is called, files are :{}"+filesToFinalize); super.endWindow(); finalizeFiles(); } private void finalizeFiles() { LOG.debug("Files to finalize {}",filesToFinalize.toArray()); Iterator<String> fileIt = filesToFinalize.iterator(); while(fileIt.hasNext()) { requestFinalize(fileIt.next()); fileIt.remove(); } } } #################################################################################################### _______________________________________________________________________ If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference. Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future. _______________________________________________________________________ If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference. Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future. _______________________________________________________________________ If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference. Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future. _______________________________________________________________________ If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference. Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
