Thanks Nathan. Your info is appreciated.

On Fri, May 15, 2015 at 6:36 AM, Nathan Leung <[email protected]> wrote:

> You should not get this if you're creating a new object every time. The
> rule of thumb is that data in tuples should be considered immutable; you
> can break the rule but it requires care otherwise you will encounter subtle
> problems.
>
> Given your timeline, this only makes sense if you reuse an object somehow.
>
> As a note, when storm passes a tuple in process it passes it by reference,
> but when it goes over the network the tuple is serialized. This is why you
> cannot change data in a tuple one it has been sent. The downstream bolt
> could have started processing it. If you have any fan out any other bolts
> that received this tuple could be using it also.
> On May 15, 2015 1:21 AM, "Banias H" <[email protected]> wrote:
>
>> Thanks Nathan.
>>
>> Sorry for the confusion. Bolt_A actually generates a new SimplePojo
>> object in every execute() function. So I shouldn't be reusing the same pojo
>> object in Bolt_A.
>>
>> To give you more details, from the same worker log file I am seeing
>> (without the copy workaround):
>>
>> t=1: Bolt_A generated pojo_1 and completed collector.emit(new
>> Values(pojo_1))
>>
>> t=2: Bolt_B started.
>>    - Obtained pojo through tuple: SimplePojo pojo = (SimplePojo)
>> tuple.getValue(0);
>>    - Printed out pojo to have values of pojo_1
>>    - Continued running execute()
>>
>> t=3: Bolt_A generated pojo_2 and completed collector.emit(new
>> Values(pojo_2))
>>
>> t=4: Bolt_B printed out the values of pojo again before the end of
>> execute(). It however has the values of pojo_2.
>>
>> t=5: Bolt_B completed executed()
>>
>> I checked the worker log files in other hosts and verified that pojo_2
>> got emitted to different host/worker.
>>
>>
>> Would you kindly share the way to safely pass objects between bolts? Many
>> thanks. I would really appreciate more info on this.
>>
>> On Thu, May 14, 2015 at 8:15 PM, Nathan Leung <[email protected]> wrote:
>>
>>> It sounds like you reuse the same simple pojo object in bolt a. You
>>> should avoid doing this; even with your copy workaround it's possible to
>>> run into more subtle race conditions.
>>> On May 14, 2015 8:51 PM, "Banias H" <[email protected]> wrote:
>>>
>>>> In prototyping an application, I use a simple pojo (see below) to send
>>>> data between bolts. In an earlier version of my bolt, I would simply get
>>>> the pojo using tuple.getValue() inside the execute() function, like:
>>>>
>>>> SimplePojo pojo = (SimplePojo) tuple.getValue(0);
>>>>
>>>> But when I scale up and have many executors running on the same worker,
>>>> I run into a data issue that the pojo would have different values within
>>>> execute() function. Upon closer look, it seems like when the same worker
>>>> ran many of the two different bolts, it would do something like:
>>>>
>>>> t=1: Run Bolt_A with pojo_1
>>>> t=2: Run Bolt_B with pojo_1 and run Bolt_A with pojo_2.
>>>>
>>>> In my topology, Bolt_B runs substantially longer. At t=2, when I
>>>> printed out the values of pojo at the beginning of Bolt_B's execute(), it
>>>> had values in pojo_1. Then before the end of Bolt_B's execute() when I
>>>> printed values of pojo again, it had values in pojo_2. In other words, it
>>>> is like the pojo pointed to different SimplePojo within the execute()
>>>> function.
>>>>
>>>> So I ended up putting a copy constructor in SimplePojo and create a new
>>>> SimplePojo inside execute() function, like:
>>>>
>>>> SimplePojo sources = new SimplePojo((SimplePojo) tuple.getValue(0));
>>>>
>>>> After that I no longer encounter the data issue above. So my question
>>>> is... should I always make a copy of the data in Tuple in bolt?
>>>>
>>>> Thanks,
>>>> B
>>>>
>>>>
>>>>
>>>> public class SimplePojo implements KryoSerializable {
>>>>  private long id1;
>>>> private long id2;
>>>> private byte[] bin1;
>>>> private byte[] bin2;
>>>>
>>>> public SimplePojo(SimplePojo source) {
>>>> id1 = source.getId1();
>>>> id2 = source.getId2();
>>>> bin1 = source.getBin1();
>>>> bin2 = source.getBin2();
>>>> }
>>>>
>>>> //getters
>>>> public long getId1() {
>>>> return id1;
>>>> }
>>>>
>>>> public long getId2() {
>>>> return id2;
>>>> }
>>>>
>>>> public byte[] getBin1() {
>>>> return bin1;
>>>> }
>>>>
>>>> public byte[] getBin2() {
>>>> return bin2;
>>>> }
>>>>
>>>> ...
>>>>
>>>> @Override
>>>> public void read(Kryo kryo, Input input) {
>>>> id1 = input.readLong();
>>>> id2 = input.readLong();
>>>>
>>>> int bin1Len = input.readInt();
>>>> bin1 = input.readBytes(bin1Len); //Line 156
>>>>
>>>> int bin2Len = input.readInt();
>>>> bin2 = input.readBytes(bin2Len);
>>>> }
>>>>
>>>> @Override
>>>> public void write(Kryo kryo, Output output) {
>>>> output.writeLong(id1);
>>>> output.writeLong(id2);
>>>>
>>>> output.writeInt(bin1.length);
>>>> output.write(bin1);
>>>>
>>>> output.writeInt(bin2.length);
>>>> output.write(bin2);
>>>> }
>>>> }
>>>>
>>>
>>

Reply via email to