Thanks Nathan for quick reply. Here no bolt is failing. But all the tuples are routed to same worker. Remaining workers are not doing anything.
I guess something is wrong with grouping which we have implemented. Could you please help me to achieve below scenario? 1. Kafkaspout pulls one tuple from queue. 2. Tuples passed to a bolt which will identify weather it is tweet/retweet/replytweet. 3. Tuple will be further route to a specific bolt (Tweet/retweet/replytweet). Here it will insert data into HBase and forward the same tuple to user bolt for further processing. 4. User bolt will insert data into HBase database. and so on. Thanks & Regards, Riyaz On Thu, May 29, 2014 at 3:57 PM, Nathan Leung <[email protected]> wrote: > Which is the bolt that is failing? Your setup seems strange to me. You use > fields grouping, and have three bolts that subscribe from a router bolt. > But for the three fields that you set two to null. This means, I assume, > that if you send a tweet, it will always get routed to the same retweet > bolt, and the same reply bolt. Why not emit directly to three separate > streams? > On May 29, 2014 8:53 AM, "Shaikh Riyaz" <[email protected]> wrote: > >> Hi All, >> >> Please help me to solve this problem. >> >> Still not, server is going down due to high load. >> >> Thanks &Regards, >> Riyaz >> >> >> On Thu, May 29, 2014 at 2:05 AM, Shaikh Riyaz <[email protected]>wrote: >> >>> Here is the configuration. >>> Config conf = new Config(); >>> conf.setNumWorkers(10); >>> conf.setMaxSpoutPending(80000); >>> conf.setMaxTaskParallelism(6); >>> //conf.setDebug(true); >>> conf.put(Config.NIMBUS_HOST, "x.x.x.x"); >>> conf.put(Config.STORM_ZOOKEEPER_PORT, 2181); >>> conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 64 * 1024); >>> >>> Do i have to use shuffle grouping instead of field gouping? >>> >>> >>> >>> On Thu, May 29, 2014 at 2:01 AM, P. Taylor Goetz <[email protected]>wrote: >>> >>>> That seems okay. How many workers are you assigning to the topology? >>>> >>>> -Taylor >>>> >>>> On May 28, 2014, at 6:15 PM, Shaikh Riyaz <[email protected]> wrote: >>>> >>>> Hi All, >>>> >>>> Thanks for your prompt reply. >>>> >>>> Please find the below code. >>>> >>>> ------------------------------------------------------------------------------------ >>>> [CODE] >>>> builder.setSpout(spoutid, kafkaspout, 5); >>>> builder.setBolt("tweetStream", new ActivityBolt(), >>>> 3).shuffleGrouping(spoutid); >>>> builder.setBolt("tweetBolt", new HBStreamTweetBolt("Tweet"), >>>> 5).fieldsGrouping("tweetStream", new Fields("tweet")); >>>> builder.setBolt("reTweetBolt", new HBStreamReTweetBolt("ReTweet"), >>>> 5).fieldsGrouping("tweetStream", new Fields("reTweet")); >>>> builder.setBolt("replyTweetBolt", new >>>> HBStreamReplyTweetBolt("ReplyTweet"), 3).fieldsGrouping("tweetStream", new >>>> Fields("replyTweet")); >>>> builder.setBolt("twitterUserLBolt", new >>>> HBStreamUserBolt("Twitter_User"), 5).fieldsGrouping("tweetBolt", new >>>> Fields("user")).fieldsGrouping("reTweetBolt", new >>>> Fields("user")).fieldsGrouping("replyTweetBolt", new Fields("user")); >>>> builder.setBolt("tweetMediaURLBolt", new >>>> HBStreamTweetMediaURLBolt("TweetMediaURL"), >>>> 3).fieldsGrouping("twitterUserLBolt", new Fields("tweetMediaURL")); >>>> builder.setBolt("tweetHashTagsBolt", new >>>> HBStreamTweetHashtagsBolt("TweetHashtags"), >>>> 3).fieldsGrouping("tweetMediaURLBolt", new Fields("tweetHashtags")); >>>> builder.setBolt("tweetEntitiesURLBolt", new >>>> HBStreamTweetEntitiesURLBolt("TweetEntities_URL"), >>>> 3).fieldsGrouping("tweetHashTagsBolt", new Fields("tweetEntitiesURL")); >>>> builder.setBolt("tweetUsermentionURLBolt", new >>>> HBStreamTweetUserMentionBolt("TweetUserMention"), >>>> 3).fieldsGrouping("tweetEntitiesURLBolt", new >>>> Fields("tweetUsermentionURL")); >>>> [CODE] >>>> >>>> ----------------------------------------------------------------------------------- >>>> >>>> *Activity Bolt:* >>>> >>>> declarer.declare(new Fields("tweet","reTweet","replyTweet")); >>>> >>>> *Inside Execute:* >>>> if(activity.getVerb().equalsIgnoreCase("post") && >>>> activity.getInReplyTo() == null) { >>>> //System.out.println("######### Emitting Tweet #######"); >>>> this.collector.emit(new Values(activity,null,null)); >>>> }else if(activity.getVerb().equalsIgnoreCase("share")) { >>>> //System.out.println("######### Emitting ReTweet #######"); >>>> this.collector.emit(new Values(null,activity,null)); >>>> }else if(activity.getVerb().equalsIgnoreCase("post") && >>>> activity.getInReplyTo() != null) { >>>> //System.out.println("######### Emitting ReplyTweet #######"); >>>> this.collector.emit(new Values(null,null,activity)); >>>> } >>>> >>>> -------------------------------------------------------------------------------------------------------------------------------------------------------- >>>> >>>> *HBStreamTweet Bolt* >>>> >>>> declarer.declare(new Fields("user")); >>>> >>>> *Inside Execute method:* >>>> this.collector.emit(new Values(activity)); >>>> >>>> ----------------------------------------------------------------------------------------------------------------------------------------------------------- >>>> >>>> Similarly, we are creating fields and emitting activity. >>>> >>>> Is something wrong with the fieldgrouping we have implemented? >>>> >>>> Please suggest me the best way to implement this. >>>> >>>> Thanks & Regards, >>>> *Riyaz* >>>> >>>> >>>> On Thu, May 29, 2014 at 1:04 AM, P. Taylor Goetz <[email protected]>wrote: >>>> >>>>> Fields grouping uses a mod hash function to determine which task to >>>>> send a tuple. >>>>> >>>>> It sounds like there's not enough variety in the field values you are >>>>> grouping such that they are all getting sent to the same task. >>>>> >>>>> Without seeing your code and data I can't tell for sure. >>>>> >>>>> -Taylor >>>>> >>>>> On May 28, 2014, at 5:56 PM, Shaikh Riyaz <[email protected]> >>>>> wrote: >>>>> >>>>> Hi All, >>>>> >>>>> We running Storm cluster with following servers. >>>>> >>>>> One Nimbus >>>>> Six supervisor with 2 workers each running on 6700 and 6701 ports. >>>>> >>>>> All tuples are going to only one supervisor and only to one worker >>>>> (6701) running on that supervisor. >>>>> >>>>> We have one KafkaSpout and 6 bolts processing the data. We are using >>>>> fieldgrouping to pass tuple from one bolt to another. Each tuple is saving >>>>> some data to HBase. >>>>> >>>>> One of the executor has emitted 609180 tuples and remaining executor >>>>> has emitted 200 tuples as whole. >>>>> >>>>> we have configured our spout and tuples with parallelism hint 5. >>>>> >>>>> Please let me know what might wrong with the configuration. >>>>> >>>>> Thanks in advance. >>>>> >>>>> >>>>> -- >>>>> Regards, >>>>> >>>>> Riyaz >>>>> >>>>> >>>> >>>> >>>> -- >>>> Regards, >>>> >>>> Riyaz >>>> >>>> >>> >>> >>> -- >>> Regards, >>> >>> Riyaz >>> >>> >> >> >> -- >> Regards, >> >> Riyaz >> >> -- Regards, Riyaz
