Hi,
i designed sample topology , which will contain one spout and two bolts.
Here my spout is for reading list of files and emit tuple ,here i write the
code when control comes to next tuple i designed the logic which will pick a
file and keep emitting until reached to end of the file. this way is correct ,
if any thing wrong please help me. is it possible to read list of files i
mean if we have specified locationwhich may contain any number of files with
any number of records this scenario is possible in spout.?
When i submit topology with 5 executors for spout and 5 executors for bolt1 ,
5 executors for bolt2 and 5 workers in storm cluster with 3 machines one is
nimbus while remaining two are supervisors.
am unable to perform with this example ,for 10 files (each file has 100 mb
size),it's looks like very slow.
please specify any setting i forget to configure in topology.
FileReadingSpout .java----------------------------------public class
FileReadingSpout implements IRichSpout {
//BufferedReader reader; static int count = 0; SpoutOutputCollector
collector; TopologyContext context; String fileLocation, ext;
@Override public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) { try { //this.reader = new
BufferedReader(new FileReader("D://abc1.txt")); System.out.println("
Worker Port :-> " + context.getThisWorkerPort());
System.out.println("********************************************************************");
System.out.println(" FileReadingSpout ********* " + new Date() + "
currentmillseconds --> :" + System.currentTimeMillis());
System.out.println("********************************************************************");
this.context = context; this.collector = collector;
this.fileLocation = conf.get("fileLocation").toString();
this.ext = conf.get("ext").toString(); } catch (Exception e) {
e.printStackTrace(); } }
@Override public void close() { }
@Override public void activate() { }
@Override public void deactivate() { }
@Override public void nextTuple() { String line; try {
System.out.println(Thread.currentThread().getName() + "in side
ReadingSpout ...*" + new Date() + " currentymillseconds --> " +
System.currentTimeMillis());// while ((line = reader.readLine()) !=
null) {// if (validateTuple(line)) {//
System.out.println(" data reading :" + line);//
this.collector.emit(new Values(line));// }// }
System.out.println(" context :--> getThisWorkerPort* " +
context.getThisWorkerPort()); System.out.println(" context :-->
getThisWorkerTasks * " + context.getThisWorkerTasks() + " current
getThisWorkerTasks " + context.getThisTaskId());
String l = getFileNames(fileLocation, ext); if
(l.length() > 0) { System.out.println("** list of files count:"
+ l); File oldName = new File(l); String file =
l.replace(".txt", ".Reading"); File newName = new File(file);
oldName.renameTo(newName); readFiles(file);
} } catch (Exception e) { e.printStackTrace(); }
}
@Override public void ack(Object msgId) {
System.out.println(count + " In side ack ::* " + msgId); }
@Override public void fail(Object msgId) { System.out.println("
In side fail ::* " + msgId);
}
@Override public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("feed"));
}
@Override public Map<String, Object> getComponentConfiguration() {
return null; }
public static long stringToLong(String st) { long result = 0L;
try { if (st != null) { SimpleDateFormat sdf = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date d = sdf.parse(st);
result = d.getTime(); } } catch (Exception e)
{ e.printStackTrace(); } return result; }
public void readFiles(String fileName) { String data = null;
String arr[]; BufferedReader reader = null; try {
reader = new BufferedReader(new FileReader(fileName));
System.out.println(" current reading fileName is :" + fileName);
while ((data = reader.readLine()) != null && data.length() > 0) {
collector.emit(new Values(data),data);
}
} catch (IOException e) { System.out.println(" file Not
found :" + e); } finally { try { if (reader !=
null) { reader.close();
System.out.println(" reader is closed successfully !"); }
} catch (IOException e) { System.out.println("reader is not
closed properly .." + e); } }
}
public String getFileNames(String folderpath, String extention) {
List<String> fileNames = new ArrayList();
File file = new File(folderpath); final String ext = extention;
File[] files = file.listFiles(new FilenameFilter() { @Override
public boolean accept(File dir, String name) { if
(name.toLowerCase().endsWith(ext) && !name.contains("ReadingComplete")) {
return true; } else { return
false; } } }); Arrays.sort(files,
LastModifiedFileComparator.LASTMODIFIED_COMPARATOR); for (File f :
files) { return f.getAbsolutePath();//
fileNames.add(f.getAbsolutePath()); } return ""; }
}
My main application
storm .java------------
public class storm {
/** * @param args the command line arguments */ public static
Properties objProperties = null;
public static void submitTopology(String propFileLoc, String topologyName)
throws Exception { String spout = "FILE_READING_SPOUT", bolt =
"MOU_BOLT"; Config con = new Config(); con.setDebug(true);//
con.put("fileLocation", "D:\\strom\\inputFiles");//
//con.put("outFile",".txt");// con.put("outFile",
"D:/strom/inputFiles/result/CallDurationGroupingBolt.txt");//
con.put("outFile1","D:/strom/inputFiles/result/EachCallDurationBolt.txt");//
objProperties = new Properties(); objProperties.load(new
FileReader(propFileLoc)); System.out.println(" PROPERTIES ARE LOADED "
+ objProperties.size());
con.put("fileLocation", objProperties.get("inputFileLoc"));
con.put("outFile", objProperties.get("outputFileLoc"));
//con.put("windowTimeInMS",3600000); con.put("callDurationValInSec",
Integer.parseInt(objProperties.get("callDuartionValInSec").toString()));
con.put("ext", ".txt");
con.setNumWorkers(Integer.parseInt(objProperties.get("NumberOfWorkers").toString()));
int eB =
Integer.parseInt(objProperties.get("ExecutorsForBolt").toString()); int
eS = Integer.parseInt(objProperties.get("ExecutorsForSpout").toString());
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(spout, new FileReadingSpout(), 10);
builder.setBolt(bolt, new
CallDurationGroupingBolt(Integer.parseInt(objProperties.get("windowSizeInSec").toString()),
Integer.parseInt(objProperties.get("SlidingTimeInSec").toString())),
10).fieldsGrouping(spout, new Fields("feed")); builder.setBolt("MOU",
new
EachCallDurationBolt(Integer.parseInt(objProperties.get("windowSizeInSec").toString())),
10).shuffleGrouping(spout); if (topologyName != null &&
topologyName.length() > 0) { System.out.println(" SUBMITTING
TOPOLOGY INTO STORM CLUSTER ...");
StormSubmitter.submitTopology(topologyName, con, builder.createTopology());
} else { LocalCluster cluster = new LocalCluster();
cluster.submitTopology("MOU_TOPOLOGY", con, builder.createTopology()); }
}
public static void main(String[] args) { // TODO code application
logic here try { String fl=args[0]; String
tn=args[1]; if (tn.equals("1")) {
System.out.println("SUBMITTED IN LOCAL CLUSTER MODE");
submitTopology(fl, null); } else { submitTopology(fl,
tn);
} } catch (Exception e) { e.printStackTrace();
} }
}
Thanks
Prasad.ch