HI,
I designed a storm sample topology, which is contains a spout ,and bolt.Here am
reading from files(currently i have 10 files each file having 1000 records)
using spout and process to bolt.In bolt ,am processing to receiving tuples by
writing into a file .
for my example totally my spout emitting 10000 records ,while my bolt also
receive 10000 tuples and write into file.Here problem is when i run local mode
everything is fine.but when i run in cluster mode with any number of workers
,my bolt will receive or write 5000 or 7000 (randomly )records into file. am
loss the data please help me !
below is the my application code.----------------------------------------------
MainApplication .java......................................public class
MainApplication { public static void main(String[] args) throws Exception {
Config con = new Config(); con.setDebug(true); int i =
Integer.parseInt(args[0]); con.put("fileLocation", args[1]);
con.put("outFile", args[2]); con.put("ext", ".txt");
con.setNumWorkers(Integer.parseInt(args[4])); TopologyBuilder builder =
new TopologyBuilder(); builder.setSpout("FileREADING-SPOUT", new
FileReadingSpout(), i); builder.setBolt("TEST-BOLT", new TestBolt(),
i).shuffleGrouping("FileREADING-SPOUT");
StormSubmitter.submitTopology(args[3], con, builder.createTopology()); } } }
FileReadingSpout.java..................................
public class FileReadingSpout implements IRichSpout { static int count=0;
long windowTime; SpoutOutputCollector collector; TopologyContext context;
String fileLocation, ext; @Override public void open(Map conf,
TopologyContext context, SpoutOutputCollector collector) { try {
System.out.println("********************************************************************");
System.out.println(" FileReadingSpout ********* "+new Date()+"
acker"+conf.get("TOPOLOGY_MESSAGE_TIMEOUT_SECS"));
System.out.println("********************************************************************");
this.context = context; this.collector =
collector; this.fileLocation = conf.get("fileLocation").toString();
this.ext = conf.get("ext").toString(); } catch (Exception e) {
e.printStackTrace(); } }
@Override public void close() { }
@Override public void activate() { }
@Override public void deactivate() { }
@Override public void nextTuple() { String line; try {
System.out.println(Thread.currentThread().getName() + "in side
ReadingSpout ...*" + new Date()); String l =
getFileNames(fileLocation, ext); if (l.length() > 0) {
System.out.println("** list of files count:" + l); File
oldName = new File(l); String file = l.replace(".txt",
".Reading"); File newName = new File(file);
oldName.renameTo(newName); readFiles(file); }
} catch (Exception e) { e.printStackTrace(); }
}
@Override public void ack(Object msgId) {
System.out.println(count+" In side ack ::* " + msgId); }
@Override public void fail(Object msgId) { }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("line")); }
@Override public Map<String, Object> getComponentConfiguration() {
return null; }
public static long stringToLong(String st) { long result = 0L;
try { if (st != null) { SimpleDateFormat sdf = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date d = sdf.parse(st);
result = d.getTime(); } } catch (Exception e)
{ e.printStackTrace(); } return result; }
public void readFiles(String fileName) { String data = null;
String arr[]; BufferedReader reader = null; try {
reader = new BufferedReader(new FileReader(fileName));
System.out.println("=====================*====================");
System.out.println(" current reading fileName is :" + fileName);
while ((data = reader.readLine()) != null && data.length()>0) {
count++; collector.emit(new Values(data),data); }
} catch (IOException e) { System.out.println(" file Not
found :" + e); } finally { try { if (reader !=
null) { reader.close();
System.out.println(" reader is closed successfully !"); }
} catch (IOException e) { System.out.println("reader is not
closed properly .." + e); } }
}
public String getFileNames(String folderpath, String extention) {
List<String> fileNames = new ArrayList();
File file = new File(folderpath); final String ext = extention;
File[] files = file.listFiles(new FilenameFilter() { @Override
public boolean accept(File dir, String name) { if
(name.toLowerCase().endsWith(ext) && !name.contains("ReadingComplete")) {
return true; } else { return
false; } } }); Arrays.sort(files,
LastModifiedFileComparator.LASTMODIFIED_COMPARATOR); for (File f :
files) { return f.getAbsolutePath(); } return ""; }
}
TestBolt.java..........................
public class TestBolt implements IRichBolt { OutputCollector collector;
BufferedWriter writer; String filePath;@Override public void prepare(Map
stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector; try {
System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
System.out.println(stormConf.get("outFile").toString()+" TESTBOLT
********* "+new Date()+" acker"+stormConf.get("topology.environment"));
System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
writer = new BufferedWriter(new
FileWriter(stormConf.get("outFile").toString(),true)); } catch
(Exception e) { e.printStackTrace(); } }
@Override public void execute(Tuple input) {
System.out.println("^^^^^^^^^^^^^^^^^^"); System.out.println(" tuple
data is :" + input.getString(0)); try { collector.ack(input);
collector.emit(new Values(input));
writer.write(input.getString(0) + " \n"); writer.flush();
} catch (Exception e) {e.printStackTrace(); }
}@Override public void cleanup() { System.out.println(" in side
clean up"); } @Override public void
declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("data")); }@Override public Map<String,
Object> getComponentConfiguration() { return null; }}
For Cluster Mode folloowing to execute above app
storm jar /apps/apache-storm-0.9.3/lib/storm-parallelism-ex.jar
com.jesus.storm.MainApplication 5 /FS/testFiles/stormfiles
/FS/testFiles/result/result.txt MAIN 2
in above i used 5 executors and 2 workers , even i increase the number of
worker also no change please help me
THANK YOU FOR YOUR VALUABLE HELP 😊
Regard'sprasad.ch