Hi Susheel, Thanks for the reply..
I tried emitting the path.toString() as the messageId in my Spout like below: this.collector.emit( new Values( path.toString() ), path.toString() ); And in my bolt the emit and ack is like below: outputCollector.emit( tuple, new Values( currentLine ) ); outputCollector.ack( tuple ); But still i do not see the ack coming in for Spout. Can you please point what i am doing wrong here. Regards, Rajeev. On 21 January 2018 at 21:41, Susheel Kumar Gadalay <[email protected]> wrote: > I think this emit from bolt is not correct. > outputCollector.emit( tuple, new Values( currentLine ) ); > You have to anchor the same msgid of spout. > > > On 1/21/18, Rajeev <[email protected]> wrote: > > Hi, > > > > I have a simple topology as below: > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout( "DirSpout", new DirectoryScan() ); > > builder.setBolt( "printBeacon", new FileSource(), 1 ).shuffleGrouping( > > "DirSpout" ); > > Below is my Spout: > > > > public class DirectoryScan extends BaseRichSpout { > > > > private String inputDirectory; > > private String compressionType; > > private String fileNamePattern; > > private boolean validationNeeded; > > private String validPattern; > > private File validationFile; > > private SpoutOutputCollector collector; > > @Override > > public void ack( Object msgId ) { > > System.out.println("acked with: " + msgId ); > > Tuple tuple = ( Tuple ) msgId; > > String path = tuple.getString( 0 ); > > System.out.println( "Path acked: " + path ); > > } > > > > @Override > > public void fail(Object msgId) { > > System.out.println( "I failed you: " + msgId.toString() ); > > } > > @Override > > public void open( Map conf, TopologyContext context, SpoutOutputCollector > > collector ) { > > > > this.collector = collector; > > inputDirectory = ( String ) conf.get( "inputDirectory" ); > > compressionType = ( String ) conf.get( "compressionType" ); > > fileNamePattern = ( String ) conf.get( "fileNamePattern" ); > > validationNeeded = ( Boolean ) conf.get( "validationNeeded" ); > > validPattern = ( String ) conf.get( "validPattern" ); > > } > > > > @Override > > public void nextTuple() { > > int i = 1; > > File dir = new File( inputDirectory ); > > FileFilter filteredFiles = new RegexFileFilter( fileNamePattern ); > > > > int maxFiles = 4; > > while( true ) { > > System.out.println( "NIO run" ); > > long start = System.currentTimeMillis(); > > Path pdir = FileSystems.getDefault().getPath( inputDirectory ); > > try { > > Thread.sleep(5000); > > DirectoryStream<Path> stream = Files.newDirectoryStream( pdir, > > fileNamePattern ); > > for (Path path : stream) { > > if ( i > maxFiles) break; > > System.out.println( "" + i + ": " + path.getFileName() ); > > Object msgId = "ID " + i; > > this.collector.emit( new Values( path.toString() ), msgId ); > > i ++; > > } > > stream.close(); > > } catch (IOException | InterruptedException e) { > > // TODO Auto-generated catch block > > e.printStackTrace(); > > } > > long stop = System.currentTimeMillis(); > > System.out.println( "Elapsed: " + (stop - start) + " ms" ); > > } > > } > > > > @Override > > public void declareOutputFields(OutputFieldsDeclarer declarer) { > > declarer.declare( new Fields( "File" ) ); > > } > > > > public static class FileComparator implements Comparator< File > { > > > > public int compare( File fileA, File fileB ) { > > long lastModifiedA = fileA.lastModified(); > > long lastModifiedB = fileB.lastModified(); > > > > if ( lastModifiedA > lastModifiedB ) > > return 1; > > else if ( lastModifiedB > lastModifiedA ) > > return -1; > > > > return 0; > > } > > } > > > > } > > > > > > > > > > > > > > > > Below is my Bolt: > > public class FileSource extends BaseRichBolt { > > > > private OutputCollector outputCollector = null; > > private String processedDirectory; > > private boolean deleteFile; > > private BufferedReader bufferedReader = null; > > private FileReader fileReader = null; > > private File thisFile = null; > > private String thisFileName = null; > > private boolean validationNeeded; > > private String validPattern; > > private File validationFile; > > > > @Override > > public void prepare( Map stormConf, TopologyContext context, > > OutputCollector collector ) { > > > > processedDirectory = ( String ) stormConf.get( "processedDirectory" ); > > deleteFile = ( Boolean ) stormConf.get( "deleteFile" ); > > validationNeeded = ( Boolean ) stormConf.get( "validationNeeded" ); > > validPattern = ( String ) stormConf.get( "validPattern" ); > > > > outputCollector = collector; > > } > > > > @Override > > public void execute( Tuple tuple ) { > > > > String file = tuple.getStringByField( "File" ); > > try { > > File thisFile = new File( file ); > > Path path = thisFile.toPath(); > > BufferedReader reader = Files.newBufferedReader( path, > > Charset.forName("UTF-8") ); > > > > String currentLine = null; > > while((currentLine = reader.readLine()) != null){ > > System.out.println( "Printing within filesource: " + currentLine ); > > outputCollector.emit( tuple, new Values( currentLine ) ); > > } > > outputCollector.ack( tuple ); > > System.out.println("end of file read: " + path.toString()); > > thisFile.renameTo( new File( processedDirectory + "/" + > thisFile.getName() > > ) ); > > > > } catch ( IOException e ) { > > e.printStackTrace(); > > } > > } > > > > @Override > > public void declareOutputFields( OutputFieldsDeclarer declarer ) { > > declarer.declare( new Fields( "record" ) ); > > } > > } > > > > > > > > > > > > But the above is not calling the ack method in the spout. Can you please > > let me know what is wrong in this code. > > > > Regards, > > Rajeev. > > >
