If I get it right, you have multiple bolt tasks writing to the same output file. Because your data set is quite small, I would assume, that some tasks receive data later then other, and overwrite the result of previously finished tasks.
If you ensure, that all 5 bolt tasks are located on different hosts, it
should work.
Did you check the Storm UI to see if
* multiple bolt tasks are deployed to the same node and
* how many tuples are processed by each bolt taks?
-Matthias
On 05/06/2015 01:31 PM, prasad ch wrote:
> HI,
>
>
> I designed a storm sample topology, which is contains a spout ,and bolt.
> Here am reading from files(currently i have 10 files each file having
> 1000 records) using spout and process to bolt.
> In bolt ,am processing to receiving tuples by writing into a file .
>
> for my example totally my spout emitting 10000 records ,while my bolt
> also receive 10000 tuples and write into file.
> Here problem is when i run local mode everything is fine.but when i run
> in cluster mode with any number of workers ,
> my bolt will receive or write 5000 or 7000 (randomly )records into
> file. am loss the data please help me !
>
>
> below is the my application code.
> ----------------------------------------------
> MainApplication .java
> ......................................
> public class MainApplication {
> public static void main(String[] args) throws Exception {
> Config con = new Config();
> con.setDebug(true);
> int i = Integer.parseInt(args[0]);
> con.put("fileLocation", args[1]);
> con.put("outFile", args[2]);
> con.put("ext", ".txt");
> con.setNumWorkers(Integer.parseInt(args[4]));
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("FileREADING-SPOUT", new FileReadingSpout(), i);
> builder.setBolt("TEST-BOLT", new TestBolt(),
> i).shuffleGrouping("FileREADING-SPOUT");
> StormSubmitter.submitTopology(args[3], con,
> builder.createTopology()); } } }
>
>
> FileReadingSpout.java
> ..................................
>
>
> public class FileReadingSpout implements IRichSpout {
> static int count=0;
> long windowTime;
> SpoutOutputCollector collector;
> TopologyContext context;
> String fileLocation, ext;
> @Override
> public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
> try {
>
>
>
> System.out.println("********************************************************************");
> System.out.println(" FileReadingSpout ********* "+new
> Date()+" acker"+conf.get("TOPOLOGY_MESSAGE_TIMEOUT_SECS"));
>
>
> System.out.println("********************************************************************");
>
> this.context = context;
> this.collector = collector;
> this.fileLocation = conf.get("fileLocation").toString();
> this.ext = conf.get("ext").toString();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>
> @Override
> public void close() {
> }
>
> @Override
> public void activate() {
> }
>
> @Override
> public void deactivate() {
> }
>
> @Override
> public void nextTuple() {
> String line;
> try {
> System.out.println(Thread.currentThread().getName() + "in
> side ReadingSpout ...*" + new Date());
> String l = getFileNames(fileLocation, ext);
> if (l.length() > 0) {
> System.out.println("** list of files count:" + l);
> File oldName = new File(l);
> String file = l.replace(".txt", ".Reading");
> File newName = new File(file);
> oldName.renameTo(newName);
> readFiles(file);
> }
> } catch (Exception e) {
> e.printStackTrace();
> }
>
> }
>
> @Override
> public void ack(Object msgId) {
> System.out.println(count+" In side ack ::* " + msgId);
> }
>
> @Override
> public void fail(Object msgId) {
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>
> declarer.declare(new Fields("line"));
>
> }
>
> @Override
> public Map<String, Object> getComponentConfiguration() {
> return null;
> }
>
> public static long stringToLong(String st) {
> long result = 0L;
> try {
> if (st != null) {
> SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd
> HH:mm:ss");
> Date d = sdf.parse(st);
> result = d.getTime();
> }
> } catch (Exception e) {
> e.printStackTrace();
> }
> return result;
> }
>
> public void readFiles(String fileName) {
> String data = null;
> String arr[];
> BufferedReader reader = null;
> try {
> reader = new BufferedReader(new FileReader(fileName));
>
> System.out.println("=====================*====================");
> System.out.println(" current reading fileName is :" +
> fileName);
> while ((data = reader.readLine()) != null && data.length()>0) {
> count++;
> collector.emit(new Values(data),data);
> }
>
> } catch (IOException e) {
> System.out.println(" file Not found :" + e);
> } finally {
> try {
> if (reader != null) {
> reader.close();
> System.out.println(" reader is closed successfully !");
> }
> } catch (IOException e) {
> System.out.println("reader is not closed properly .." + e);
> }
> }
>
> }
>
> public String getFileNames(String folderpath, String extention) {
> List<String> fileNames = new ArrayList();
>
> File file = new File(folderpath);
> final String ext = extention;
> File[] files = file.listFiles(new FilenameFilter() {
> @Override
> public boolean accept(File dir, String name) {
> if (name.toLowerCase().endsWith(ext) &&
> !name.contains("ReadingComplete")) {
> return true;
> } else {
> return false;
> }
> }
> });
> Arrays.sort(files,
> LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);
> for (File f : files) {
> return f.getAbsolutePath();
> }
> return "";
> }
>
> }
>
> TestBolt.java
> ..........................
>
> public class TestBolt implements IRichBolt {
> OutputCollector collector;
> BufferedWriter writer;
> String filePath;
> @Override
> public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> this.collector = collector;
> try {
>
> System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
> System.out.println(stormConf.get("outFile").toString()+"
> TESTBOLT ********* "+new Date()+"
> acker"+stormConf.get("topology.environment"));
>
> System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
>
> writer = new BufferedWriter(new
> FileWriter(stormConf.get("outFile").toString(),true));
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
>
> @Override
> public void execute(Tuple input) {
> System.out.println("^^^^^^^^^^^^^^^^^^");
> System.out.println(" tuple data is :" + input.getString(0));
> try {
> collector.ack(input);
> collector.emit(new Values(input));
> writer.write(input.getString(0) + " \n");
> writer.flush();
>
> } catch (Exception e) {
> e.printStackTrace();
> }
>
> }
> @Override
> public void cleanup() {
> System.out.println(" in side clean up");
> }
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("data"));
> }
> @Override
> public Map<String, Object> getComponentConfiguration() {
> return null;
> }
> }
>
>
>
>
> For Cluster Mode folloowing to execute above app
> *
> *
> * storm jar /apps/apache-storm-0.9.3/lib/storm-parallelism-ex.jar
> com.jesus.storm.MainApplication 5 /FS/testFiles/stormfiles
> /FS/testFiles/result/result.txt MAIN 2*
>
>
> in above i used 5 executors and 2 workers , even i increase the number
> of worker also no change
> please help me
>
>
>
>
> *THANK YOU FOR YOUR VALUABLE HELP *Emoji
>
> Regard's
> prasad.ch
>
>
>
>
>
signature.asc
Description: OpenPGP digital signature
