Kushan, My question was about this "B1 and B2 are the same bolt but
running on 2 separate tasks.". Are they both same code i.e updating
cassandra table?. If so don't you need to do fieldsGrouping on B1
too? -Harsha
On Tue, Jan 20, 2015, at 05:35 PM, Kushan Maskey wrote:
> Bolts is pretty simple, it get the message and hten updates the data
> as I have explained in the earlier email.
>
> @Override public synchronized void execute(Tuple input,
> BasicOutputCollector collector) { MyDomainClass domainClass =
> (MyDomainClass) input.getValueByField("bytes");
>
> if(domainClass != null) {
>
> boolean success = controller.myDataUpdateFunction(domainClass);
> if(success) {
>
> collector.emit(new Values(domainClass)); } } }
>
> @Override public void declareOutputFields(OutputFieldsDeclarer
> declarer) { declarer.declare(new Fields("bytes")); }
>
> Does it sound like I am doing correctly? I am sure there is something
> I am not doing it correctly. LMK.
>
>
> --
> Kushan Maskey
>
> On Tue, Jan 20, 2015 at 4:33 PM, Nathan Leung
> <[email protected]> wrote:
>> I assume Bolt2 in the snippet is the bolt in question? What do
>> declareOutputFields and emit in Bolt1 look like? Are you able to show
>> the logic of Bolt2?
>> On Jan 20, 2015 5:08 PM, "Kushan Maskey"
>> <[email protected]> wrote:
>>> B1 and B2 are the same bolt but running on 2 separate tasks.
>>>
>>>
>>> Here is the snippet of the topologyBuilder function I have.
>>>
>>> spout_parallelism_hint = 4; bolt_parallelism_hint = 4;
>>>
>>> private static void buildTopology(TopologyBuilder builder) {
>>> KafkaSpout spout = new
>>> KafkaSpout(getSpoutConfig(propMap.get(KAFKA_TOPIC), "ID1"));
>>>
>>> builder.setSpout(SPOUT_NAME, spout, spout_parallelism_hint);
>>>
>>> builder.setBolt("MainBolt", new MainBolt(),
>>> bolt_parallelism_hint).shuffleGrouping(SPOUT_NAME);
>>> builder.setBolt("B1", new Bolt1(),
>>> bolt_parallelism_hint).shuffleGrouping(MainBolt);
>>>
>>> // go to store sales bolts first builder.setBolt("B2", new Bolt2(),
>>> bolt_parallelism_hint).fieldsGrouping(B1, new Fields("X"));
>>>
>>> // split on assoc, dept and vendor builder.setBolt("B3", new
>>> Bolt3(), bolt_parallelism_hint).shuffleGrouping(B2); } I got bunch
>>> of other bolts pretty much doing the same thing as above.
>>>
>>> LMK if that is sufficient. Thanks.
>>>
>>>
>>> --
>>> Kushan Maskey
>>>
>>> On Tue, Jan 20, 2015 at 3:45 PM, Nathan Leung <[email protected]>
>>> wrote:
>>>> Actually I thought about it and you should not have to do
>>>> fieldsGrouping on both X and Y; one should be sufficient. In your
>>>> original email, are B1 and B2 the same bolt, but different tasks,
>>>> or are they different bolts entirely? As Harsha pointed out, it may
>>>> help if you give more details of how your topology is constructed.
>>>>
>>>> On Tue, Jan 20, 2015 at 4:42 PM, Kushan Maskey
>>>> <[email protected]> wrote:
>>>>> I am only fieldGrouping on X and not Y. Is it necessary to
>>>>> fieldGroup by both the fields? Is there any sample document I can
>>>>> look at? Thanks.
>>>>>
>>>>> --
>>>>> Kushan Maskey
>>>>> 817.403.7500
>>>>> M. Miller & Associates[1] [email protected]
>>>>>
>>>>> On Tue, Jan 20, 2015 at 3:14 PM, Nathan Leung <[email protected]>
>>>>> wrote:
>>>>>> which fields are you doing fieldsGrouping on? If you do fields
>>>>>> grouping on X and Y, why are you having a race condition in a
>>>>>> separate bolt task? Each X and Y combo should always go to the
>>>>>> same bolt task with fieldsGrouping, and the scenario you describe
>>>>>> should work properly whether you have 1 task, 4 tasks, or 100
>>>>>> tasks.
>>>>>>
>>>>>> On Tue, Jan 20, 2015 at 4:11 PM, Kushan Maskey
>>>>>> <[email protected]> wrote:
>>>>>>> Not at the moment. We have been using KafkaSpout for all the
>>>>>>> other projects but have not looked into using trident. How would
>>>>>>> it help resolve the issue we are facing at the moment. We also
>>>>>>> need to keep in mind the development time it would take to
>>>>>>> implement triedent. While KafkaSpout has been working fine with
>>>>>>> all the other projects.
>>>>>>>
>>>>>>> --
>>>>>>> Kushan Maskey
>>>>>>>
>>>>>>> On Tue, Jan 20, 2015 at 3:05 PM, Rajiv Onat <[email protected]>
>>>>>>> wrote:
>>>>>>>> Seems like stateful processing, have you looked at using
>>>>>>>> trident ?
>>>>>>>>
>>>>>>>> -Rajiv
>>>>>>>>
>>>>>>>> On Jan 20, 2015, at 12:26 PM, Kushan Maskey
>>>>>>>> <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> Thanks Keith and Itai,
>>>>>>>>>
>>>>>>>>> We are using fieldGrouping. Initially we were using
>>>>>>>>> suffleGrouping, we saw this problem and then moved to
>>>>>>>>> fieldGrouping, with better result, until now. I am thinking
>>>>>>>>> due to bolts parallelism which we have set it to 4, is the
>>>>>>>>> culprit here. My understanding of parallelism is threading,
>>>>>>>>> correct me if I am not incorrect.
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Kushan Maskey
>>>>>>>>>
>>>>>>>>> On Tue, Jan 20, 2015 at 1:03 PM, Itai Frenkel
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> Are you familiar with field grouping ? The idea is that the
>>>>>>>>>> same bolt instance would always update the value of a
>>>>>>>>>> specific key (similar to web load balancer cookie
>>>>>>>>>> stickiness).
>>>>>>>>>> https://storm.apache.org/documentation/Concepts.html
>>>>>>>>>> **"Fields grouping***: The stream is partitioned by the
>>>>>>>>>> fields specified in the grouping. For example, if the stream
>>>>>>>>>> is grouped by the "user-id" field, tuples with the same
>>>>>>>>>> "user-id" will always go to the same task, but tuples with
different "user-id"'s may go to different tasks."*
>>>>>>>>>> **
>>>>>>>>>> Itai
>>>>>>>>>>
>>>>>>>>>> *From:* Kushan Maskey <[email protected]>
>>>>>>>>>> *Sent:* Tuesday, January 20, 2015 8:55 PM *To:*
>>>>>>>>>> [email protected] *Subject:* URGENT!! Race condition
>>>>>>>>>>
>>>>>>>>>> We are having a major issue trying to update Cassandra
>>>>>>>>>> database where we see race condition in a bolt.
>>>>>>>>>>
>>>>>>>>>> Here is an example,
>>>>>>>>>>
>>>>>>>>>> I have a columnfamily, where i have 2 partitioning columns
>>>>>>>>>> say X and Y. There is another columns Z which basically
>>>>>>>>>> aggregated number. We are suppose to update Z based on X and
>>>>>>>>>> Y. Storm is reading a huge volume of data from Kafka. When
>>>>>>>>>> sport receives a
message, first bolt reads the database for that combination of X and Y
and get the value of Z. Then it updates the value Z and store it back
into the database. Bolt parallelism is set to be 4 which mean 4
instances of bolt are trying to update the database. So when first bolt
(B1) read the value of Z to be say 100, same time the second bolt (B2)
also read it to be 100, but once B1 completed execution and the value
of Z is now 150, B2 still has 100 so the value of Z is out of sync.
>>>>>>>>>>
>>>>>>>>>> How can we prevent the race condition like this? This is
>>>>>>>>>> causing a major nuisance to us.
>>>>>>>>>>
>>>>>>>>>> Any help is highly appreciated. Thanks.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Kushan Maskey
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>
Links:
1. http://mmillerassociates.com/