Here is the configuration. Config conf = new Config(); conf.setNumWorkers(10); conf.setMaxSpoutPending(80000); conf.setMaxTaskParallelism(6); //conf.setDebug(true); conf.put(Config.NIMBUS_HOST, "x.x.x.x"); conf.put(Config.STORM_ZOOKEEPER_PORT, 2181); conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 64 * 1024);
Do i have to use shuffle grouping instead of field gouping? On Thu, May 29, 2014 at 2:01 AM, P. Taylor Goetz <[email protected]> wrote: > That seems okay. How many workers are you assigning to the topology? > > -Taylor > > On May 28, 2014, at 6:15 PM, Shaikh Riyaz <[email protected]> wrote: > > Hi All, > > Thanks for your prompt reply. > > Please find the below code. > > ------------------------------------------------------------------------------------ > [CODE] > builder.setSpout(spoutid, kafkaspout, 5); > builder.setBolt("tweetStream", new ActivityBolt(), > 3).shuffleGrouping(spoutid); > builder.setBolt("tweetBolt", new HBStreamTweetBolt("Tweet"), > 5).fieldsGrouping("tweetStream", new Fields("tweet")); > builder.setBolt("reTweetBolt", new HBStreamReTweetBolt("ReTweet"), > 5).fieldsGrouping("tweetStream", new Fields("reTweet")); > builder.setBolt("replyTweetBolt", new > HBStreamReplyTweetBolt("ReplyTweet"), 3).fieldsGrouping("tweetStream", new > Fields("replyTweet")); > builder.setBolt("twitterUserLBolt", new HBStreamUserBolt("Twitter_User"), > 5).fieldsGrouping("tweetBolt", new > Fields("user")).fieldsGrouping("reTweetBolt", new > Fields("user")).fieldsGrouping("replyTweetBolt", new Fields("user")); > builder.setBolt("tweetMediaURLBolt", new > HBStreamTweetMediaURLBolt("TweetMediaURL"), > 3).fieldsGrouping("twitterUserLBolt", new Fields("tweetMediaURL")); > builder.setBolt("tweetHashTagsBolt", new > HBStreamTweetHashtagsBolt("TweetHashtags"), > 3).fieldsGrouping("tweetMediaURLBolt", new Fields("tweetHashtags")); > builder.setBolt("tweetEntitiesURLBolt", new > HBStreamTweetEntitiesURLBolt("TweetEntities_URL"), > 3).fieldsGrouping("tweetHashTagsBolt", new Fields("tweetEntitiesURL")); > builder.setBolt("tweetUsermentionURLBolt", new > HBStreamTweetUserMentionBolt("TweetUserMention"), > 3).fieldsGrouping("tweetEntitiesURLBolt", new > Fields("tweetUsermentionURL")); > [CODE] > > ----------------------------------------------------------------------------------- > > *Activity Bolt:* > > declarer.declare(new Fields("tweet","reTweet","replyTweet")); > > *Inside Execute:* > if(activity.getVerb().equalsIgnoreCase("post") && activity.getInReplyTo() > == null) { > //System.out.println("######### Emitting Tweet #######"); > this.collector.emit(new Values(activity,null,null)); > }else if(activity.getVerb().equalsIgnoreCase("share")) { > //System.out.println("######### Emitting ReTweet #######"); > this.collector.emit(new Values(null,activity,null)); > }else if(activity.getVerb().equalsIgnoreCase("post") && > activity.getInReplyTo() != null) { > //System.out.println("######### Emitting ReplyTweet #######"); > this.collector.emit(new Values(null,null,activity)); > } > > -------------------------------------------------------------------------------------------------------------------------------------------------------- > > *HBStreamTweet Bolt* > > declarer.declare(new Fields("user")); > > *Inside Execute method:* > this.collector.emit(new Values(activity)); > > ----------------------------------------------------------------------------------------------------------------------------------------------------------- > > Similarly, we are creating fields and emitting activity. > > Is something wrong with the fieldgrouping we have implemented? > > Please suggest me the best way to implement this. > > Thanks & Regards, > *Riyaz* > > > On Thu, May 29, 2014 at 1:04 AM, P. Taylor Goetz <[email protected]>wrote: > >> Fields grouping uses a mod hash function to determine which task to send >> a tuple. >> >> It sounds like there's not enough variety in the field values you are >> grouping such that they are all getting sent to the same task. >> >> Without seeing your code and data I can't tell for sure. >> >> -Taylor >> >> On May 28, 2014, at 5:56 PM, Shaikh Riyaz <[email protected]> wrote: >> >> Hi All, >> >> We running Storm cluster with following servers. >> >> One Nimbus >> Six supervisor with 2 workers each running on 6700 and 6701 ports. >> >> All tuples are going to only one supervisor and only to one worker (6701) >> running on that supervisor. >> >> We have one KafkaSpout and 6 bolts processing the data. We are using >> fieldgrouping to pass tuple from one bolt to another. Each tuple is saving >> some data to HBase. >> >> One of the executor has emitted 609180 tuples and remaining executor has >> emitted 200 tuples as whole. >> >> we have configured our spout and tuples with parallelism hint 5. >> >> Please let me know what might wrong with the configuration. >> >> Thanks in advance. >> >> >> -- >> Regards, >> >> Riyaz >> >> > > > -- > Regards, > > Riyaz > > -- Regards, Riyaz
