Re: Worker dies (bolt)
Hei I have made a new test and discovered that in my environment a very simple bolt will die too after around 2500 cycle. Bolt's code: 1 package storm; 2 3 import backtype.storm.task.TopologyContext; 4 import backtype.storm.topology.BasicOutputCollector; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseBasicBolt; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Tuple; 9 import backtype.storm.tuple.Values; 10 11 import java.util.Map; 12 import java.util.UUID; 13 14 public class DummyBolt extends BaseBasicBolt 15 { 16 int count = 0; 17 18 @Override 19 public void prepare(Map stormConf, TopologyContext context) { 20 } 21 22 @Override 23 public void execute(Tuple tuple, BasicOutputCollector collector) 24 { 25 String line = tuple.getString(0); 26 27 count ++; 28 System.out.println(Dummy count: + count); 29 collector.emit(new Values(line)); 30 31 } 32 33 @Override 34 public void declareOutputFields(OutputFieldsDeclarer declarer) 35 { 36 declarer.declare(new Fields(line)); 37 } 38 39 @Override 40 public void cleanup() 41 { 42 } 43 44 } after around 2500 cycles there is no output from execute methods. What I do after this. [root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids [root@dlvm2 sysconfig]# kill -9 4179 after it new worker is coming up and it works again around 2500 cycles and stops and I have to kill pid again. Any ideas? Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314) On 02/06/14 13:36, Margusja wrote: Hi I am using apache-storm-0.9.1-incubating. I have simple topology: Spout reads from kafka topic and Bolt writes lines from spout to HBase. recently we did a test - we send 300 000 000 messages over kafka-rest - kafka-queue - storm topology - hbase. I noticed that around one hour and around 2500 messages worker died. PID is there and process is up but bolt's execute method hangs. Bolts code is: package storm; 2 3 import java.util.Map; 4 import java.util.UUID; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.HBaseConfiguration; 8 import org.apache.hadoop.hbase.client.HTableInterface; 9 import org.apache.hadoop.hbase.client.HTablePool; 10 import org.apache.hadoop.hbase.client.Put; 11 import org.apache.hadoop.hbase.util.Bytes; 12 13 import backtype.storm.task.TopologyContext; 14 import backtype.storm.topology.BasicOutputCollector; 15 import backtype.storm.topology.OutputFieldsDeclarer; 16 import backtype.storm.topology.base.BaseBasicBolt; 17 import backtype.storm.tuple.Fields; 18 import backtype.storm.tuple.Tuple; 19 import backtype.storm.tuple.Values; public class HBaseWriterBolt extends BaseBasicBolt 22 { 23 24 HTableInterface usersTable; 25 HTablePool pool; 26 int count = 0; 27 28 @Override 29 public void prepare(Map stormConf, TopologyContext context) { 30 Configuration conf = HBaseConfiguration.create(); 31 conf.set(hbase.defaults.for.version,0.96.0.2.0.6.0-76-hadoop2); 32 conf.set(hbase.defaults.for.version.skip,true); 33 conf.set(hbase.zookeeper.quorum, vm24,vm37,vm38); 34 conf.set(hbase.zookeeper.property.clientPort, 2181); 35 conf.set(hbase.rootdir, hdfs://vm38:8020/user/hbase/data); 36 //conf.set(zookeeper.znode.parent, /hbase-unsecure); 37 38 pool = new HTablePool(conf, 1); 39 usersTable = pool.getTable(kafkademo1); 40 } 41 42 @Override 43 public void execute(Tuple tuple, BasicOutputCollector collector) 44 { 45 String line = tuple.getString(0); 46 47 Put p = new Put(Bytes.toBytes(UUID.randomUUID().toString())); 48 p.add(Bytes.toBytes(info), Bytes.toBytes(line), Bytes.toBytes(line)); 49 50 try { 51 usersTable.put(p); 52 count ++; 53 System.out.println(Count: + count); 54 } 55 catch (Exception e){ 56 e.printStackTrace(); 57 } 58 collector.emit(new Values(line)); 59 60 } 61 62 @Override 63 public void declareOutputFields(OutputFieldsDeclarer declarer) 64 { 65 declarer.declare(new Fields(line)); 66 } 67 68 @Override 69 public void cleanup() 70 { 71 try { 72 usersTable.close(); 73
Re: Worker dies (bolt)
Some new information. Set debug true and from active worker log I can see: if worker is ok: 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker __ack_ack [7197822474056634252] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: KafkaConsumerSpout:1, stream: default, id: {4344988213623161794=-5214435544383558411}, my message... and after worker dies there are only rows about spout like: 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout __ack_init [3399515592775976300 5357635772515085965 1] 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314) On 03/06/14 09:58, Margusja wrote: Hei I have made a new test and discovered that in my environment a very simple bolt will die too after around 2500 cycle. Bolt's code: 1 package storm; 2 3 import backtype.storm.task.TopologyContext; 4 import backtype.storm.topology.BasicOutputCollector; 5 import backtype.storm.topology.OutputFieldsDeclarer; 6 import backtype.storm.topology.base.BaseBasicBolt; 7 import backtype.storm.tuple.Fields; 8 import backtype.storm.tuple.Tuple; 9 import backtype.storm.tuple.Values; 10 11 import java.util.Map; 12 import java.util.UUID; 13 14 public class DummyBolt extends BaseBasicBolt 15 { 16 int count = 0; 17 18 @Override 19 public void prepare(Map stormConf, TopologyContext context) { 20 } 21 22 @Override 23 public void execute(Tuple tuple, BasicOutputCollector collector) 24 { 25 String line = tuple.getString(0); 26 27 count ++; 28 System.out.println(Dummy count: + count); 29 collector.emit(new Values(line)); 30 31 } 32 33 @Override 34 public void declareOutputFields(OutputFieldsDeclarer declarer) 35 { 36 declarer.declare(new Fields(line)); 37 } 38 39 @Override 40 public void cleanup() 41 { 42 } 43 44 } after around 2500 cycles there is no output from execute methods. What I do after this. [root@dlvm2 sysconfig]# ls /var/lib/storm/workers/*/pids [root@dlvm2 sysconfig]# kill -9 4179 after it new worker is coming up and it works again around 2500 cycles and stops and I have to kill pid again. Any ideas? Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314) On 02/06/14 13:36, Margusja wrote: Hi I am using apache-storm-0.9.1-incubating. I have simple topology: Spout reads from kafka topic and Bolt writes lines from spout to HBase. recently we did a test - we send 300 000 000 messages over kafka-rest - kafka-queue - storm topology - hbase. I noticed that around one hour and around 2500 messages worker died. PID is there and process is up but bolt's execute method hangs. Bolts code is: package storm; 2 3 import java.util.Map; 4 import java.util.UUID; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.HBaseConfiguration; 8 import org.apache.hadoop.hbase.client.HTableInterface; 9 import org.apache.hadoop.hbase.client.HTablePool; 10 import org.apache.hadoop.hbase.client.Put; 11 import org.apache.hadoop.hbase.util.Bytes; 12 13 import backtype.storm.task.TopologyContext; 14 import backtype.storm.topology.BasicOutputCollector; 15 import backtype.storm.topology.OutputFieldsDeclarer; 16 import backtype.storm.topology.base.BaseBasicBolt; 17 import backtype.storm.tuple.Fields; 18 import backtype.storm.tuple.Tuple; 19 import backtype.storm.tuple.Values; public class HBaseWriterBolt extends BaseBasicBolt 22 { 23 24 HTableInterface usersTable; 25 HTablePool pool; 26 int count = 0; 27 28 @Override 29 public void prepare(Map stormConf, TopologyContext context) { 30 Configuration conf = HBaseConfiguration.create(); 31 conf.set(hbase.defaults.for.version,0.96.0.2.0.6.0-76-hadoop2); 32 conf.set(hbase.defaults.for.version.skip,true); 33 conf.set(hbase.zookeeper.quorum, vm24,vm37,vm38); 34 conf.set(hbase.zookeeper.property.clientPort, 2181); 35 conf.set(hbase.rootdir, hdfs://vm38:8020/user/hbase/data); 36 //conf.set(zookeeper.znode.parent, /hbase-unsecure); 37 38 pool = new HTablePool(conf, 1); 39
Re: Worker dies (bolt)
Ok got more info. Looks like the problem is related with spout. I changed spout: 32 public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) 33 { 34 this.collector = collector; 35 36 Properties props = new Properties(); 37 props.put(zookeeper.connect, vm24:2181,vm37:2181,vm38:2181); 38 props.put(group.id, testgroup); 39 props.put(zookeeper.session.timeout.ms, 500); 40 props.put(zookeeper.sync.time.ms, 250); 41 props.put(auto.commit.interval.ms, 1000); 42 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); 43 this.topic = kafkademo1; 44 45 46 } to 32 public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) 33 { 34 this.collector = collector; 35 36 Properties props = new Properties(); 37 props.put(zookeeper.connect, vm24:2181,vm37:2181,vm38:2181); 38 props.put(group.id, testgroup); 39 //props.put(zookeeper.session.timeout.ms, 500); 40 //props.put(zookeeper.sync.time.ms, 250); 41 //props.put(auto.commit.interval.ms, 1000); 42 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); 43 this.topic = kafkademo1; 44 45 46 } and 48 public void nextTuple() 49 { 50 55 56 MapString, Integer topicCount = new HashMapString, Integer(); 57 // Define single thread for topic 58 topicCount.put(topic, new Integer(1)); 59 MapString, ListKafkaStreambyte[], byte[] consumerStreams = consumer.createMessageStreams(topicCount); 60 ListKafkaStreambyte[], byte[] streams = consumerStreams.get(topic); 61 for (final KafkaStream stream : streams) { 62 ConsumerIteratorbyte[], byte[] consumerIte = stream.iterator(); 63 while (consumerIte.hasNext()) 64 { 65 // System.out.println(Message from the Topic ...); 66 String line = new String(consumerIte.next().message()); 67 this.collector.emit(new Values(line), line); 69 } 70 71 72 } 73 if (consumer != null) 74 consumer.shutdown(); 75 } to 48 public void nextTuple() 49 { 50 55 56 MapString, Integer topicCount = new HashMapString, Integer(); 57 // Define single thread for topic 58 topicCount.put(topic, new Integer(1)); 59 MapString, ListKafkaStreambyte[], byte[] consumerStreams = consumer.createMessageStreams(topicCount); 60 ListKafkaStreambyte[], byte[] streams = consumerStreams.get(topic); 61 for (final KafkaStream stream : streams) { 62 ConsumerIteratorbyte[], byte[] consumerIte = stream.iterator(); 63 while (consumerIte.hasNext()) 64 { 65 // System.out.println(Message from the Topic ...); 66 String line = new String(consumerIte.next().message()); 67 //this.collector.emit(new Values(line), line); 68 this.collector.emit(new Values(line)); 69 } 70 71 72 } 73 if (consumer != null) 74 consumer.shutdown(); 75 } And now it is running. Strange because when worker died then I see log rows from spout. But I think it is related somehow with the internal stuff in storm. Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314) On 03/06/14 11:09, Margusja wrote: Some new information. Set debug true and from active worker log I can see: if worker is ok: 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting: hbasewriter __ack_ack [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: hbasewriter:3, stream: __ack_ack, id: {}, [7197822474056634252 -608920652033678418] 2014-06-03 11:04:55 b.s.d.task [INFO] Emitting direct: 1; __acker __ack_ack [7197822474056634252] 2014-06-03 11:04:55 b.s.d.executor [INFO] Processing received message source: KafkaConsumerSpout:1, stream: default, id: {4344988213623161794=-5214435544383558411}, my message... and after worker dies there are only rows about spout like: 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout __ack_init [3399515592775976300 5357635772515085965 1] 2014-06-03 11:06:30 b.s.d.task [INFO] Emitting: KafkaConsumerSpout default Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee