Hi,

The file will be available after window is committed, you can overwrite
committed call and start your thread after super.commit is called. You
might want to double check if file is actually finalized before starting
your thread..

For your usecase I would suggest you to use AbstractFileOutputOperator to
directly write file to ftp.

-Priyanka

On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <
[email protected]> wrote:

> Hi ,
>
> Can you please let me know what happen when the requestFinalize() method
> is called as per below ?
>
> Once the output files are written to HDFS, I would like to initiate a
> thread that reads the HDFS files and copies to FTP location. So I am trying
> to understand when can I trigger the thread.
>
> ####################### File Writer
> ##########################################
>
> package com.rbc.aml.cnscan.operator;
>
> import com.datatorrent.api.Context;
> import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
> import com.rbc.aml.cnscan.utils.KeyValue;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.util.ArrayList;
> import java.util.Iterator;
> import java.util.List;
>
> public class FileWriter extends
> AbstractFileOutputOperator<KeyValue<String, String>> {
>     private static final Logger LOG =
> LoggerFactory.getLogger(FileWriter.class);
>     private List<String> filesToFinalize = new ArrayList<>();
>
>     @Override
>     public void setup(Context.OperatorContext context) {
>         super.setup(context);
>         finalizeFiles();
>     }
>
>     @Override
>     protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
>         if (tuple.value == null) {
>          LOG.debug("File to finalize {}",tuple.key);
>             filesToFinalize.add(tuple.key);
>             return new byte[0];
>         }
>         else {
>             return tuple.value.getBytes();
>         }
>     }
>
>     @Override
>     protected String getFileName(KeyValue<String, String> tuple) {
>         return tuple.key;
>     }
>
>     @Override
>     public void endWindow() {
>          LOG.info("end window is called, files are :{}"+filesToFinalize);
>         super.endWindow();
>         finalizeFiles();
>     }
>
>     private void finalizeFiles() {
>          LOG.debug("Files to finalize {}",filesToFinalize.toArray());
>         Iterator<String> fileIt = filesToFinalize.iterator();
>         while(fileIt.hasNext()) {
>             requestFinalize(fileIt.next());
>             fileIt.remove();
>         }
>     }
> }
>
> ####################################################################################################
>
>
> _______________________________________________________________________
>
> If you received this email in error, please advise the sender (by return
> email or otherwise) immediately. You have consented to receive the attached
> electronically at the above-noted email address; please retain a copy of
> this confirmation for future reference.
>
> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> cette confirmation pour les fins de reference future.
>
>

Reply via email to