Hi,
Check app: https://github.com/apache/apex-malhar/tree/master/apps/filecopy
This is for HDFS to HDFS copy but I could use same app to copy from HDFS to
FTP as HDFS api supports ftp as well.
Please note following property I used to run the app:
<property>
<name>dt.operator.HDFSFileCopyModule.prop.outputDirectoryPath</name>
<value>ftp://ftpadmin:ftpadmin@localhost
:21/home/ftp/ftpadmin/out</value>
</property>
-Priyanka
On Sat, Jul 9, 2016 at 12:33 PM, Priyanka Gugale <[email protected]>
wrote:
> I m traveling over weekends, would get back on Monday.
>
> -Priyanka
> On Jul 8, 2016 8:21 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <
> [email protected]> wrote:
>
>> Thank you Priyanka. Do you have any example that uses this Operator for
>> FTP?
>>
>>
>>
>> Regards,
>>
>> Surya Vamshi
>>
>>
>>
>> *From:* Priyanka Gugale [mailto:[email protected]]
>> *Sent:* 2016, July, 08 10:48 AM
>> *To:* [email protected]
>> *Subject:* RE: Inputs needed on File Writer
>>
>>
>>
>> Yes, ftp is supported but not sftp.
>>
>> -Priyanka
>>
>> On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" <
>> [email protected]> wrote:
>>
>> Hi Priyanka,
>>
>>
>>
>> Thank you for your inputs.
>>
>>
>>
>> It may be dumb question, I heard from data torrent that SFTP is not
>> supported for now in my previous communications.That means FTP is supported
>> and SFTP is not supported ? please clarify the difference.
>>
>>
>>
>> Regards,
>>
>> Surya Vamshi
>>
>>
>>
>> *From:* Priyanka Gugale [mailto:[email protected]]
>> *Sent:* 2016, July, 08 12:07 AM
>> *To:* [email protected]
>> *Subject:* Re: Inputs needed on File Writer
>>
>>
>>
>> Hi,
>>
>>
>>
>> The file will be available after window is committed, you can overwrite
>> committed call and start your thread after super.commit is called. You
>> might want to double check if file is actually finalized before starting
>> your thread..
>>
>>
>>
>> For your usecase I would suggest you to use AbstractFileOutputOperator to
>> directly write file to ftp.
>>
>>
>>
>> -Priyanka
>>
>>
>>
>> On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
>> [email protected]> wrote:
>>
>> Hi ,
>>
>>
>>
>> Can you please let me know what happen when the requestFinalize() method
>> is called as per below ?
>>
>>
>>
>> Once the output files are written to HDFS, I would like to initiate a
>> thread that reads the HDFS files and copies to FTP location. So I am trying
>> to understand when can I trigger the thread.
>>
>>
>>
>> ####################### File Writer
>> ##########################################
>>
>>
>>
>> package com.rbc.aml.cnscan.operator;
>>
>>
>>
>> import com.datatorrent.api.Context;
>>
>> import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
>>
>> import com.rbc.aml.cnscan.utils.KeyValue;
>>
>> import org.slf4j.Logger;
>>
>> import org.slf4j.LoggerFactory;
>>
>>
>>
>> import java.util.ArrayList;
>>
>> import java.util.Iterator;
>>
>> import java.util.List;
>>
>>
>>
>> public class FileWriter extends
>> AbstractFileOutputOperator<KeyValue<String, String>> {
>>
>> private static final Logger LOG =
>> LoggerFactory.getLogger(FileWriter.class);
>>
>> private List<String> filesToFinalize = new ArrayList<>();
>>
>>
>>
>> @Override
>>
>> public void setup(Context.OperatorContext context) {
>>
>> super.setup(context);
>>
>> finalizeFiles();
>>
>> }
>>
>>
>>
>> @Override
>>
>> protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
>>
>> if (tuple.value == null) {
>>
>> LOG.debug("File to finalize {}",tuple.key);
>>
>> filesToFinalize.add(tuple.key);
>>
>> return new byte[0];
>>
>> }
>>
>> else {
>>
>> return tuple.value.getBytes();
>>
>> }
>>
>> }
>>
>>
>>
>> @Override
>>
>> protected String getFileName(KeyValue<String, String> tuple) {
>>
>> return tuple.key;
>>
>> }
>>
>>
>>
>> @Override
>>
>> public void endWindow() {
>>
>> LOG.info("end window is called, files are :{}"+filesToFinalize);
>>
>> super.endWindow();
>>
>> finalizeFiles();
>>
>> }
>>
>>
>>
>> private void finalizeFiles() {
>>
>> LOG.debug("Files to finalize {}",filesToFinalize.toArray());
>>
>> Iterator<String> fileIt = filesToFinalize.iterator();
>>
>> while(fileIt.hasNext()) {
>>
>> requestFinalize(fileIt.next());
>>
>> fileIt.remove();
>>
>> }
>>
>> }
>>
>> }
>>
>>
>> ####################################################################################################
>>
>>
>>
>> _______________________________________________________________________
>>
>> If you received this email in error, please advise the sender (by return
>> email or otherwise) immediately. You have consented to receive the attached
>> electronically at the above-noted email address; please retain a copy of
>> this confirmation for future reference.
>>
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
>> cette confirmation pour les fins de reference future.
>>
>>
>>
>> _______________________________________________________________________
>>
>> If you received this email in error, please advise the sender (by return
>> email or otherwise) immediately. You have consented to receive the attached
>> electronically at the above-noted email address; please retain a copy of
>> this confirmation for future reference.
>>
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
>> cette confirmation pour les fins de reference future.
>>
>> _______________________________________________________________________
>>
>> If you received this email in error, please advise the sender (by return
>> email or otherwise) immediately. You have consented to receive the attached
>> electronically at the above-noted email address; please retain a copy of
>> this confirmation for future reference.
>>
>> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
>> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
>> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
>> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
>> cette confirmation pour les fins de reference future.
>>
>>