Somewhere in your code you are starting way too many threads (more than thousands). I don't see that in your code you posted, so it must be in one of the classes you haven't posted.
Are you using multithreading anywhere? Are you instantiating services that spawn threads (like network clients)? If you can't tell where it is, you can look at thread dump to see what these threads are. For this you can use profilers, debuggers, jvisualvm or you can also kill the process with `kill -3` and it will dump it to stdout. On Sun, Jun 7, 2015 at 10:22 AM, Michail Toutoudakis <[email protected]> wrote: > I am trying to read some data from text file and process them. I am > currently using scanner. In the beginning everything works fine for the > first 10000 values and then it looks like no other input lines are sent to > the bolt that implements the algorithm. Finally after a few minutes of run > i get java.lang.OutOfMemoryError: unable to create new native thread > > The file reader spout implementation is: > > package tuc.LSH.storm.spouts; > > import backtype.storm.spout.SpoutOutputCollector; > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.IRichSpout; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.topology.base.BaseRichSpout; > import backtype.storm.tuple.Fields; > import backtype.storm.tuple.Values; > import backtype.storm.utils.Utils; > import tuc.LSH.conf.Consts; > > import javax.rmi.CORBA.Util; > import java.io.*; > import java.util.Map; > import java.util.Scanner; > > /** > * Created by mixtou on 15/5/15. > */ > public class FileReaderSpout extends BaseRichSpout { > //public class FileReaderSpout implements IRichSpout { > > private SpoutOutputCollector collector; > private Scanner scanner; > private boolean completed; > private TopologyContext context; > private int spout_idx; > private int spout_id; > private Map config; > private int noOfFailedWords; > private int noOfAckedWords; > > @Override > public void declareOutputFields(OutputFieldsDeclarer > outputFieldsDeclarer) { > outputFieldsDeclarer.declareStream("data", new Fields("streamId", > "timestamp", "value")); > > > } > > @Override > public void open(Map config, TopologyContext topologyContext, > SpoutOutputCollector spoutOutputCollector) { > this.context = topologyContext; > this.spout_idx = context.getThisTaskIndex(); > this.spout_id = context.getThisTaskId(); > this.collector = spoutOutputCollector; > this.config = config; > this.completed = false; > this.noOfFailedWords = 0; > this.noOfAckedWords = 0; > > try { > this.scanner = new Scanner(new > File(config.get(file_to_read()).toString())); > System.err.println("Scanner Reading File: " + > config.get(file_to_read()).toString() + " Spout index: " + spout_idx); > } catch (FileNotFoundException e) { > e.printStackTrace(); > } > > } > > @Override > public void nextTuple() { > > if(!completed) { > if (scanner.hasNextLine()) { > String[] temp = scanner.nextLine().split(","); > // System.err.println("============== " + temp[0] + " + " + > temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value > collector.emit("data", new Values(temp[0], temp[2], temp[3]), > temp[0]); //emmit the correct data to next bolt without guarantee delivery > Utils.sleep(1); > } else { > System.err.println("End of File Closing Reader"); > scanner.close(); > completed = true; > } > } > > } > > private String file_to_read() { > // this.spout_id = context.getThisTaskId(); > if (Consts.NO_OF_SPOUTS > 1) { > int file_no = spout_idx % Consts.NO_OF_SPOUTS; > return "data" + file_no; > } else { > return "data"; > } > } > > @Override > public void ack(Object msgId) { > super.ack(msgId); > noOfAckedWords++; > // System.out.println("OK tuple acked from bolt: " + msgId + " no of > acked word " + noOfAckedWords); > System.out.println("no of acked tuples: "+noOfAckedWords); > } > > @Override > public void fail(Object msgId) { > super.fail(msgId); > noOfFailedWords++; > System.err.println("ERROR: " + context.getThisComponentId() + " " + > msgId + " no of words failed " + noOfFailedWords); > > } > > } > > And the bolt that gets the tuples from file reader spout and process them > is: > > package tuc.LSH.storm.bolts; > > import backtype.storm.task.OutputCollector; > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.topology.base.BaseRichBolt; > import backtype.storm.tuple.Fields; > import backtype.storm.tuple.Tuple; > import backtype.storm.tuple.Values; > import backtype.storm.utils.Utils; > import sun.jvm.hotspot.runtime.*; > import tuc.LSH.core.hashfunctions.HashFunctionsGen; > import tuc.LSH.core.timeseries.UniversalBasicWindow; > > > import java.lang.Thread; > import java.util.*; > > /** > * Created by mixtou on 17/5/15. > */ > public class LSHBolt extends BaseRichBolt { > private int task_id; > private OutputCollector collector; > private UniversalBasicWindow universalBasicWindow; > > private String streamId; > private String time; > private Float value; > > @Override > public void prepare(Map conf, TopologyContext topologyContext, > OutputCollector outputCollector) { > this.task_id = topologyContext.getThisTaskIndex(); > this.collector = outputCollector; > this.universalBasicWindow = new UniversalBasicWindow(); > streamId = null; > time = null; > value = 0f; > System.err.println("New Bolt with id: " + task_id); > > } > > @Override > public void execute(Tuple tuple) { > > if (tuple.getSourceStreamId().equals("sync")) { > System.out.println("Bolt task id " + task_id + " received from " > + tuple.getSourceComponent() + " message " + tuple.getString(0)); > System.out.println("Normalizing: Basic Window of Bolt " + > task_id); > universalBasicWindow.normalize(); //fill the rest of the streams > with last received value to make them same size > universalBasicWindow = null; > universalBasicWindow = new UniversalBasicWindow(); > // Utils.sleep(1); > } > > if (tuple.getSourceStreamId().equals("data")) { > > streamId = tuple.getStringByField("streamId"); > time = tuple.getStringByField("timestamp"); > value = Float.parseFloat(tuple.getStringByField("value")); > > universalBasicWindow.pushStream(streamId, value); > > if (universalBasicWindow.isFull(task_id)) { //check if any stream > of the window is full > > // System.out.println("Univ. Basic Window of bolt " + task_id > + " is Filled Up"); > > collector.emit("bwFilled", new Values(task_id)); > Utils.sleep(1); > > // universalBasicWindow.normalize(); > // universalBasicWindow = new UniversalBasicWindow(); > > // TODO:: Add basic window to sliding window and clear > > } > > } > > > // System.err.println("SourceComponent: "+tuple.getSourceComponent()); > // System.err.println("SourceStreamId: "+tuple.getSourceStreamId()); > // System.err.println("Source Task: "+tuple.getSourceTask()); > // System.err.println("SourceGlobalStreamId: > "+tuple.getSourceGlobalStreamid()); > // System.err.println("MessageId: "+tuple.getMessageId()); > > collector.ack(tuple); > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer > outputFieldsDeclarer) { > outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id")); > > } > } > > File reader spout stops acking values by the time that > universalBasicWindow is full > > The Bolt that gets the stream “bwfilled” is just a bolt that sends reset > back to LSHBolt so that all instances will reset basicWindow. > > ResetBolt implementation is: > > package tuc.LSH.storm.bolts; > > import backtype.storm.task.OutputCollector; > import backtype.storm.task.TopologyContext; > import backtype.storm.topology.OutputFieldsDeclarer; > import backtype.storm.topology.base.BaseRichBolt; > import backtype.storm.tuple.Fields; > import backtype.storm.tuple.Tuple; > import backtype.storm.tuple.Values; > import backtype.storm.utils.Utils; > import tuc.LSH.conf.Consts; > import tuc.LSH.core.hashfunctions.HashFunctionsGen; > > import javax.rmi.CORBA.Util; > import java.util.HashSet; > import java.util.Map; > import java.util.Set; > > /** > * Created by mixtou on 2/6/15. > */ > public class ResetBolt extends BaseRichBolt { > > private int task_id; > private OutputCollector collector; > // private int noOfFilledBoltsInstaces; > private Set<Integer> filledBoltInstances; > > @Override > public void prepare(Map map, TopologyContext topologyContext, > OutputCollector outputCollector) { > > this.task_id = topologyContext.getThisTaskIndex(); > this.collector = outputCollector; > this.filledBoltInstances = new HashSet<>(); > HashFunctionsGen.generateHashFunctionsForUBW(); > > } > > @Override > public void execute(Tuple tuple) { > > if (tuple.getSourceStreamId().equals("bwFilled")) { > // System.err.println("Reset Bolt Received window filled from bolt > with task id " + tuple.getInteger(0)); > // filledBoltInstances.add(tuple.getInteger(0)); > > //GENERATE NEW HASH FUNCTIONS WHEN ALL BASIC WINDOW INSTANCES ARE > FILLED?????? > // if (filledBoltInstances.size() == Consts.NO_OF_LSH_BOLTS) { > // filledBoltInstances.clear(); > //// System.out.println("Updating Basic Window Hash > Functions"); > //// HashFunctionsGen.clear(); > //// HashFunctionsGen.generateHashFunctionsForUBW(); > // > // } > collector.emit("sync", new Values("Reset")); > // Utils.sleep(1); > // System.err.println("Emitted ResetBW to all LSH Bolt Instances"); > collector.ack(tuple); > } > > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer > outputFieldsDeclarer) { > outputFieldsDeclarer.declareStream("sync", new Fields("message")); > } > } > > Finally the topology implementation is: > > package tuc.LSH.storm.topologies; > > import backtype.storm.Config; > import backtype.storm.LocalCluster; > import backtype.storm.StormSubmitter; > import backtype.storm.generated.AlreadyAliveException; > import backtype.storm.generated.Bolt; > import backtype.storm.generated.InvalidTopologyException; > import backtype.storm.topology.BoltDeclarer; > import backtype.storm.topology.TopologyBuilder; > import backtype.storm.tuple.Fields; > import backtype.storm.utils.Utils; > import tuc.LSH.conf.Consts; > import tuc.LSH.core.timeseries.UniversalBasicWindow; > import tuc.LSH.storm.bolts.LSHBolt; > import tuc.LSH.storm.bolts.ResetBolt; > import tuc.LSH.storm.spouts.FileReaderSpout; > > import java.util.Map; > > /** > * Created by mixtou on 13/5/15. > */ > public class LSHTopology { > > public static void main(String[] args) throws Exception{ > > TopologyBuilder builder = new TopologyBuilder(); > > // builder.setSpout("RandomStreamSpout", new RandomStreamSpout(), > Consts.NO_OF_SPOUTS); > builder.setSpout("FileReaderSpout", new FileReaderSpout(), > Consts.NO_OF_SPOUTS); > BoltDeclarer lshBolt = builder.setBolt("LSH", new LSHBolt(), > Consts.NO_OF_LSH_BOLTS) > .fieldsGrouping("FileReaderSpout", "data", new > Fields("streamId")); > builder.setBolt("ResetBolt", new ResetBolt(), > 1).shuffleGrouping("LSH", "bwFilled"); > > lshBolt.allGrouping("ResetBolt", "sync"); > > Config config = new Config(); > > for(Map.Entry<String, String> entry : Consts.data_files.entrySet()){ > config.put(entry.getKey(), entry.getValue()); > } > > config.setDebug(false); > config.setFallBackOnJavaSerialization(false); > // config.registerSerialization(UniversalBasicWindow.class); > > > > if (args != null && args.length > 0) { > try { > StormSubmitter.submitTopology(args[0], config, > builder.createTopology()); > } catch (AlreadyAliveException e) { > e.printStackTrace(); > } catch (InvalidTopologyException e) { > e.printStackTrace(); > } > } else { > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("LSHTopology", config, > builder.createTopology()); > > Thread.sleep(500000); > Utils.sleep(500000); > cluster.killTopology("LSHTopology"); > cluster.shutdown(); > > } > > } > > } > > >
