Hi, I prefer all answers to be posed via e-mail as I'm working at a customer office... and it is not nice to them being at the phone.
On Wed, Jan 21, 2015 at 1:25 PM, Nathan Leung <[email protected]> wrote: > Your number of fields declared must match the number of fields emitted in > each tuple. Also if you are doing a grouping by a field name, then yes you > must declare a field with that name. So in this case you should declare one > field with name X. Alternatively, you can change your topology so the > fields grouping is done on "bytes" and not "X". > On Jan 21, 2015 8:22 AM, "Kushan Maskey" < > [email protected]> wrote: > >> So are you saying instead of declareOutputFields as bytes we declare it >> as X? I tried to have both the fields there but storm complained about >> number of declared fields. >> On Jan 21, 2015 6:53 AM, Nathan Leung <[email protected]> wrote: >> >> If this is bolt 1, you are doing fieldsGrouping on field X but none of >> your output fields are named X. >> On Jan 20, 2015 8:37 PM, "Kushan Maskey" < >> [email protected]> wrote: >> >>> Bolts is pretty simple, it get the message and hten updates the data as >>> I have explained in the earlier email. >>> >>> @Override >>> public synchronized void execute(Tuple input, BasicOutputCollector >>> collector) { >>> MyDomainClass domainClass = (MyDomainClass) >>> input.getValueByField("bytes"); >>> if(domainClass != null) { >>> boolean success = controller.myDataUpdateFunction(domainClass); >>> if(success) { >>> collector.emit(new Values(domainClass)); >>> } >>> } >>> } >>> >>> @Override >>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>> declarer.declare(new Fields("bytes")); >>> } >>> >>> Does it sound like I am doing correctly? I am sure there is something I >>> am not doing it correctly. LMK. >>> >>> >>> -- >>> Kushan Maskey >>> >>> On Tue, Jan 20, 2015 at 4:33 PM, Nathan Leung <[email protected]> wrote: >>> >>>> I assume Bolt2 in the snippet is the bolt in question? What do >>>> declareOutputFields and emit in Bolt1 look like? Are you able to show the >>>> logic of Bolt2? >>>> On Jan 20, 2015 5:08 PM, "Kushan Maskey" < >>>> [email protected]> wrote: >>>> >>>>> B1 and B2 are the same bolt but running on 2 separate tasks. >>>>> >>>>> >>>>> Here is the snippet of the topologyBuilder function I have. >>>>> >>>>> spout_parallelism_hint = 4; >>>>> bolt_parallelism_hint = 4; >>>>> >>>>> private static void buildTopology(TopologyBuilder builder) { >>>>> KafkaSpout spout = new >>>>> KafkaSpout(getSpoutConfig(propMap.get(KAFKA_TOPIC), "ID1")); >>>>> >>>>> builder.setSpout(SPOUT_NAME, spout, spout_parallelism_hint); >>>>> builder.setBolt("MainBolt", new MainBolt(), >>>>> bolt_parallelism_hint).shuffleGrouping(SPOUT_NAME); >>>>> builder.setBolt("B1", new Bolt1(), >>>>> bolt_parallelism_hint).shuffleGrouping(MainBolt); >>>>> // go to store sales bolts first >>>>> builder.setBolt("B2", new Bolt2(), >>>>> bolt_parallelism_hint).fieldsGrouping(B1, new Fields("X")); >>>>> // split on assoc, dept and vendor >>>>> builder.setBolt("B3", new Bolt3(), >>>>> bolt_parallelism_hint).shuffleGrouping(B2); >>>>> } >>>>> I got bunch of other bolts pretty much doing the same thing as above. >>>>> >>>>> LMK if that is sufficient. Thanks. >>>>> >>>>> >>>>> -- >>>>> Kushan Maskey >>>>> >>>>> On Tue, Jan 20, 2015 at 3:45 PM, Nathan Leung <[email protected]> >>>>> wrote: >>>>> >>>>>> Actually I thought about it and you should not have to do >>>>>> fieldsGrouping on both X and Y; one should be sufficient. In your >>>>>> original >>>>>> email, are B1 and B2 the same bolt, but different tasks, or are they >>>>>> different bolts entirely? As Harsha pointed out, it may help if you give >>>>>> more details of how your topology is constructed. >>>>>> >>>>>> On Tue, Jan 20, 2015 at 4:42 PM, Kushan Maskey < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I am only fieldGrouping on X and not Y. Is it necessary to >>>>>>> fieldGroup by both the fields? Is there any sample document I can look >>>>>>> at? >>>>>>> Thanks. >>>>>>> >>>>>>> -- >>>>>>> Kushan Maskey >>>>>>> 817.403.7500 >>>>>>> M. Miller & Associates <http://mmillerassociates.com/> >>>>>>> [email protected] >>>>>>> >>>>>>> On Tue, Jan 20, 2015 at 3:14 PM, Nathan Leung <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> which fields are you doing fieldsGrouping on? If you do fields >>>>>>>> grouping on X and Y, why are you having a race condition in a separate >>>>>>>> bolt >>>>>>>> task? Each X and Y combo should always go to the same bolt task with >>>>>>>> fieldsGrouping, and the scenario you describe should work properly >>>>>>>> whether >>>>>>>> you have 1 task, 4 tasks, or 100 tasks. >>>>>>>> >>>>>>>> On Tue, Jan 20, 2015 at 4:11 PM, Kushan Maskey < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Not at the moment. We have been using KafkaSpout for all the other >>>>>>>>> projects but have not looked into using trident. How would it help >>>>>>>>> resolve >>>>>>>>> the issue we are facing at the moment. We also need to keep in mind >>>>>>>>> the >>>>>>>>> development time it would take to implement triedent. While >>>>>>>>> KafkaSpout has >>>>>>>>> been working fine with all the other projects. >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Kushan Maskey >>>>>>>>> >>>>>>>>> On Tue, Jan 20, 2015 at 3:05 PM, Rajiv Onat <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Seems like stateful processing, have you looked at using trident ? >>>>>>>>>> >>>>>>>>>> -Rajiv >>>>>>>>>> >>>>>>>>>> On Jan 20, 2015, at 12:26 PM, Kushan Maskey < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>> Thanks Keith and Itai, >>>>>>>>>> >>>>>>>>>> We are using fieldGrouping. Initially we were using >>>>>>>>>> suffleGrouping, we saw this problem and then moved to fieldGrouping, >>>>>>>>>> with >>>>>>>>>> better result, until now. I am thinking due to bolts parallelism >>>>>>>>>> which we >>>>>>>>>> have set it to 4, is the culprit here. My understanding of >>>>>>>>>> parallelism is >>>>>>>>>> threading, correct me if I am not incorrect. >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Kushan Maskey >>>>>>>>>> >>>>>>>>>> On Tue, Jan 20, 2015 at 1:03 PM, Itai Frenkel <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Are you familiar with field grouping ? The idea is that the >>>>>>>>>>> same bolt instance would always update the value of a specific key >>>>>>>>>>> (similar >>>>>>>>>>> to web load balancer cookie stickiness). >>>>>>>>>>> >>>>>>>>>>> https://storm.apache.org/documentation/Concepts.html >>>>>>>>>>> >>>>>>>>>>> *"Fields grouping**: The stream is partitioned by the fields >>>>>>>>>>> specified in the grouping. For example, if the stream is grouped by >>>>>>>>>>> the >>>>>>>>>>> "user-id" field, tuples with the same "user-id" will always go to >>>>>>>>>>> the same >>>>>>>>>>> task, but tuples with different "user-id"'s may go to different >>>>>>>>>>> tasks."* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Itai >>>>>>>>>>> >>>>>>>>>>> ------------------------------ >>>>>>>>>>> >>>>>>>>>>> *From:* Kushan Maskey <[email protected]> >>>>>>>>>>> *Sent:* Tuesday, January 20, 2015 8:55 PM >>>>>>>>>>> *To:* [email protected] >>>>>>>>>>> *Subject:* URGENT!! Race condition >>>>>>>>>>> >>>>>>>>>>> We are having a major issue trying to update Cassandra >>>>>>>>>>> database where we see race condition in a bolt. >>>>>>>>>>> >>>>>>>>>>> Here is an example, >>>>>>>>>>> >>>>>>>>>>> I have a columnfamily, where i have 2 partitioning columns say >>>>>>>>>>> X and Y. There is another columns Z which basically aggregated >>>>>>>>>>> number. We >>>>>>>>>>> are suppose to update Z based on X and Y. Storm is reading a huge >>>>>>>>>>> volume of >>>>>>>>>>> data from Kafka. When sport receives a message, first bolt reads the >>>>>>>>>>> database for that combination of X and Y and get the value of Z. >>>>>>>>>>> Then it >>>>>>>>>>> updates the value Z and store it back into the database. Bolt >>>>>>>>>>> parallelism >>>>>>>>>>> is set to be 4 which mean 4 instances of bolt are trying to update >>>>>>>>>>> the >>>>>>>>>>> database. So when first bolt (B1) read the value of Z to be say >>>>>>>>>>> 100, same >>>>>>>>>>> time the second bolt (B2) also read it to be 100, but once B1 >>>>>>>>>>> completed >>>>>>>>>>> execution and the value of Z is now 150, B2 still has 100 so the >>>>>>>>>>> value of Z >>>>>>>>>>> is out of sync. >>>>>>>>>>> >>>>>>>>>>> How can we prevent the race condition like this? This is >>>>>>>>>>> causing a major nuisance to us. >>>>>>>>>>> >>>>>>>>>>> Any help is highly appreciated. Thanks. >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Kushan Maskey >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> -- Regards - Ernesto Reinaldo Barreiro
