Bolts is pretty simple, it get the message and hten updates the data as I
have explained in the earlier email.
@Override
public synchronized void execute(Tuple input, BasicOutputCollector
collector) {
MyDomainClass domainClass = (MyDomainClass) input.getValueByField("bytes");
if(domainClass != null) {
boolean success = controller.myDataUpdateFunction(domainClass);
if(success) {
collector.emit(new Values(domainClass));
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("bytes"));
}
Does it sound like I am doing correctly? I am sure there is something I am
not doing it correctly. LMK.
--
Kushan Maskey
On Tue, Jan 20, 2015 at 4:33 PM, Nathan Leung <[email protected]> wrote:
> I assume Bolt2 in the snippet is the bolt in question? What do
> declareOutputFields and emit in Bolt1 look like? Are you able to show the
> logic of Bolt2?
> On Jan 20, 2015 5:08 PM, "Kushan Maskey" <
> [email protected]> wrote:
>
>> B1 and B2 are the same bolt but running on 2 separate tasks.
>>
>>
>> Here is the snippet of the topologyBuilder function I have.
>>
>> spout_parallelism_hint = 4;
>> bolt_parallelism_hint = 4;
>>
>> private static void buildTopology(TopologyBuilder builder) {
>> KafkaSpout spout = new
>> KafkaSpout(getSpoutConfig(propMap.get(KAFKA_TOPIC), "ID1"));
>>
>> builder.setSpout(SPOUT_NAME, spout, spout_parallelism_hint);
>> builder.setBolt("MainBolt", new MainBolt(),
>> bolt_parallelism_hint).shuffleGrouping(SPOUT_NAME);
>> builder.setBolt("B1", new Bolt1(),
>> bolt_parallelism_hint).shuffleGrouping(MainBolt);
>> // go to store sales bolts first
>> builder.setBolt("B2", new Bolt2(),
>> bolt_parallelism_hint).fieldsGrouping(B1, new Fields("X"));
>> // split on assoc, dept and vendor
>> builder.setBolt("B3", new Bolt3(),
>> bolt_parallelism_hint).shuffleGrouping(B2);
>> }
>> I got bunch of other bolts pretty much doing the same thing as above.
>>
>> LMK if that is sufficient. Thanks.
>>
>>
>> --
>> Kushan Maskey
>>
>> On Tue, Jan 20, 2015 at 3:45 PM, Nathan Leung <[email protected]> wrote:
>>
>>> Actually I thought about it and you should not have to do fieldsGrouping
>>> on both X and Y; one should be sufficient. In your original email, are B1
>>> and B2 the same bolt, but different tasks, or are they different bolts
>>> entirely? As Harsha pointed out, it may help if you give more details of
>>> how your topology is constructed.
>>>
>>> On Tue, Jan 20, 2015 at 4:42 PM, Kushan Maskey <
>>> [email protected]> wrote:
>>>
>>>> I am only fieldGrouping on X and not Y. Is it necessary to fieldGroup
>>>> by both the fields? Is there any sample document I can look at? Thanks.
>>>>
>>>> --
>>>> Kushan Maskey
>>>> 817.403.7500
>>>> M. Miller & Associates <http://mmillerassociates.com/>
>>>> [email protected]
>>>>
>>>> On Tue, Jan 20, 2015 at 3:14 PM, Nathan Leung <[email protected]>
>>>> wrote:
>>>>
>>>>> which fields are you doing fieldsGrouping on? If you do fields
>>>>> grouping on X and Y, why are you having a race condition in a separate
>>>>> bolt
>>>>> task? Each X and Y combo should always go to the same bolt task with
>>>>> fieldsGrouping, and the scenario you describe should work properly whether
>>>>> you have 1 task, 4 tasks, or 100 tasks.
>>>>>
>>>>> On Tue, Jan 20, 2015 at 4:11 PM, Kushan Maskey <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Not at the moment. We have been using KafkaSpout for all the other
>>>>>> projects but have not looked into using trident. How would it help
>>>>>> resolve
>>>>>> the issue we are facing at the moment. We also need to keep in mind the
>>>>>> development time it would take to implement triedent. While KafkaSpout
>>>>>> has
>>>>>> been working fine with all the other projects.
>>>>>>
>>>>>> --
>>>>>> Kushan Maskey
>>>>>>
>>>>>> On Tue, Jan 20, 2015 at 3:05 PM, Rajiv Onat <[email protected]> wrote:
>>>>>>
>>>>>>> Seems like stateful processing, have you looked at using trident ?
>>>>>>>
>>>>>>> -Rajiv
>>>>>>>
>>>>>>> On Jan 20, 2015, at 12:26 PM, Kushan Maskey <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>> Thanks Keith and Itai,
>>>>>>>
>>>>>>> We are using fieldGrouping. Initially we were using suffleGrouping,
>>>>>>> we saw this problem and then moved to fieldGrouping, with better result,
>>>>>>> until now. I am thinking due to bolts parallelism which we have set it
>>>>>>> to
>>>>>>> 4, is the culprit here. My understanding of parallelism is threading,
>>>>>>> correct me if I am not incorrect.
>>>>>>>
>>>>>>> --
>>>>>>> Kushan Maskey
>>>>>>>
>>>>>>> On Tue, Jan 20, 2015 at 1:03 PM, Itai Frenkel <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>>
>>>>>>>> Are you familiar with field grouping ? The idea is that the same
>>>>>>>> bolt instance would always update the value of a specific key (similar
>>>>>>>> to
>>>>>>>> web load balancer cookie stickiness).
>>>>>>>>
>>>>>>>> https://storm.apache.org/documentation/Concepts.html
>>>>>>>>
>>>>>>>> *"Fields grouping**: The stream is partitioned by the fields
>>>>>>>> specified in the grouping. For example, if the stream is grouped by the
>>>>>>>> "user-id" field, tuples with the same "user-id" will always go to the
>>>>>>>> same
>>>>>>>> task, but tuples with different "user-id"'s may go to different
>>>>>>>> tasks."*
>>>>>>>>
>>>>>>>>
>>>>>>>> Itai
>>>>>>>>
>>>>>>>> ------------------------------
>>>>>>>>
>>>>>>>> *From:* Kushan Maskey <[email protected]>
>>>>>>>> *Sent:* Tuesday, January 20, 2015 8:55 PM
>>>>>>>> *To:* [email protected]
>>>>>>>> *Subject:* URGENT!! Race condition
>>>>>>>>
>>>>>>>> We are having a major issue trying to update Cassandra database
>>>>>>>> where we see race condition in a bolt.
>>>>>>>>
>>>>>>>> Here is an example,
>>>>>>>>
>>>>>>>> I have a columnfamily, where i have 2 partitioning columns say X
>>>>>>>> and Y. There is another columns Z which basically aggregated number.
>>>>>>>> We are
>>>>>>>> suppose to update Z based on X and Y. Storm is reading a huge volume of
>>>>>>>> data from Kafka. When sport receives a message, first bolt reads the
>>>>>>>> database for that combination of X and Y and get the value of Z. Then
>>>>>>>> it
>>>>>>>> updates the value Z and store it back into the database. Bolt
>>>>>>>> parallelism
>>>>>>>> is set to be 4 which mean 4 instances of bolt are trying to update the
>>>>>>>> database. So when first bolt (B1) read the value of Z to be say 100,
>>>>>>>> same
>>>>>>>> time the second bolt (B2) also read it to be 100, but once B1 completed
>>>>>>>> execution and the value of Z is now 150, B2 still has 100 so the value
>>>>>>>> of Z
>>>>>>>> is out of sync.
>>>>>>>>
>>>>>>>> How can we prevent the race condition like this? This is causing
>>>>>>>> a major nuisance to us.
>>>>>>>>
>>>>>>>> Any help is highly appreciated. Thanks.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Kushan Maskey
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>