Hi ,
Can you please let me know what happen when the requestFinalize() method is
called as per below ?
Once the output files are written to HDFS, I would like to initiate a thread
that reads the HDFS files and copies to FTP location. So I am trying to
understand when can I trigger the thread.
####################### File Writer ##########################################
package com.rbc.aml.cnscan.operator;
import com.datatorrent.api.Context;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.rbc.aml.cnscan.utils.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class FileWriter extends AbstractFileOutputOperator<KeyValue<String,
String>> {
private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
private List<String> filesToFinalize = new ArrayList<>();
@Override
public void setup(Context.OperatorContext context) {
super.setup(context);
finalizeFiles();
}
@Override
protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
if (tuple.value == null) {
LOG.debug("File to finalize {}",tuple.key);
filesToFinalize.add(tuple.key);
return new byte[0];
}
else {
return tuple.value.getBytes();
}
}
@Override
protected String getFileName(KeyValue<String, String> tuple) {
return tuple.key;
}
@Override
public void endWindow() {
LOG.info("end window is called, files are :{}"+filesToFinalize);
super.endWindow();
finalizeFiles();
}
private void finalizeFiles() {
LOG.debug("Files to finalize {}",filesToFinalize.toArray());
Iterator<String> fileIt = filesToFinalize.iterator();
while(fileIt.hasNext()) {
requestFinalize(fileIt.next());
fileIt.remove();
}
}
}
####################################################################################################
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email
or otherwise) immediately. You have consented to receive the attached
electronically at the above-noted email address; please retain a copy of this
confirmation for future reference.
Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation
pour les fins de reference future.