You should not get this if you're creating a new object every time. The
rule of thumb is that data in tuples should be considered immutable; you
can break the rule but it requires care otherwise you will encounter subtle
problems.

Given your timeline, this only makes sense if you reuse an object somehow.

As a note, when storm passes a tuple in process it passes it by reference,
but when it goes over the network the tuple is serialized. This is why you
cannot change data in a tuple one it has been sent. The downstream bolt
could have started processing it. If you have any fan out any other bolts
that received this tuple could be using it also.
On May 15, 2015 1:21 AM, "Banias H" <[email protected]> wrote:

> Thanks Nathan.
>
> Sorry for the confusion. Bolt_A actually generates a new SimplePojo object
> in every execute() function. So I shouldn't be reusing the same pojo object
> in Bolt_A.
>
> To give you more details, from the same worker log file I am seeing
> (without the copy workaround):
>
> t=1: Bolt_A generated pojo_1 and completed collector.emit(new
> Values(pojo_1))
>
> t=2: Bolt_B started.
>    - Obtained pojo through tuple: SimplePojo pojo = (SimplePojo)
> tuple.getValue(0);
>    - Printed out pojo to have values of pojo_1
>    - Continued running execute()
>
> t=3: Bolt_A generated pojo_2 and completed collector.emit(new
> Values(pojo_2))
>
> t=4: Bolt_B printed out the values of pojo again before the end of
> execute(). It however has the values of pojo_2.
>
> t=5: Bolt_B completed executed()
>
> I checked the worker log files in other hosts and verified that pojo_2 got
> emitted to different host/worker.
>
>
> Would you kindly share the way to safely pass objects between bolts? Many
> thanks. I would really appreciate more info on this.
>
> On Thu, May 14, 2015 at 8:15 PM, Nathan Leung <[email protected]> wrote:
>
>> It sounds like you reuse the same simple pojo object in bolt a. You
>> should avoid doing this; even with your copy workaround it's possible to
>> run into more subtle race conditions.
>> On May 14, 2015 8:51 PM, "Banias H" <[email protected]> wrote:
>>
>>> In prototyping an application, I use a simple pojo (see below) to send
>>> data between bolts. In an earlier version of my bolt, I would simply get
>>> the pojo using tuple.getValue() inside the execute() function, like:
>>>
>>> SimplePojo pojo = (SimplePojo) tuple.getValue(0);
>>>
>>> But when I scale up and have many executors running on the same worker,
>>> I run into a data issue that the pojo would have different values within
>>> execute() function. Upon closer look, it seems like when the same worker
>>> ran many of the two different bolts, it would do something like:
>>>
>>> t=1: Run Bolt_A with pojo_1
>>> t=2: Run Bolt_B with pojo_1 and run Bolt_A with pojo_2.
>>>
>>> In my topology, Bolt_B runs substantially longer. At t=2, when I printed
>>> out the values of pojo at the beginning of Bolt_B's execute(), it had
>>> values in pojo_1. Then before the end of Bolt_B's execute() when I printed
>>> values of pojo again, it had values in pojo_2. In other words, it is like
>>> the pojo pointed to different SimplePojo within the execute() function.
>>>
>>> So I ended up putting a copy constructor in SimplePojo and create a new
>>> SimplePojo inside execute() function, like:
>>>
>>> SimplePojo sources = new SimplePojo((SimplePojo) tuple.getValue(0));
>>>
>>> After that I no longer encounter the data issue above. So my question
>>> is... should I always make a copy of the data in Tuple in bolt?
>>>
>>> Thanks,
>>> B
>>>
>>>
>>>
>>> public class SimplePojo implements KryoSerializable {
>>>  private long id1;
>>> private long id2;
>>> private byte[] bin1;
>>> private byte[] bin2;
>>>
>>> public SimplePojo(SimplePojo source) {
>>> id1 = source.getId1();
>>> id2 = source.getId2();
>>> bin1 = source.getBin1();
>>> bin2 = source.getBin2();
>>> }
>>>
>>> //getters
>>> public long getId1() {
>>> return id1;
>>> }
>>>
>>> public long getId2() {
>>> return id2;
>>> }
>>>
>>> public byte[] getBin1() {
>>> return bin1;
>>> }
>>>
>>> public byte[] getBin2() {
>>> return bin2;
>>> }
>>>
>>> ...
>>>
>>> @Override
>>> public void read(Kryo kryo, Input input) {
>>> id1 = input.readLong();
>>> id2 = input.readLong();
>>>
>>> int bin1Len = input.readInt();
>>> bin1 = input.readBytes(bin1Len); //Line 156
>>>
>>> int bin2Len = input.readInt();
>>> bin2 = input.readBytes(bin2Len);
>>> }
>>>
>>> @Override
>>> public void write(Kryo kryo, Output output) {
>>> output.writeLong(id1);
>>> output.writeLong(id2);
>>>
>>> output.writeInt(bin1.length);
>>> output.write(bin1);
>>>
>>> output.writeInt(bin2.length);
>>> output.write(bin2);
>>> }
>>> }
>>>
>>
>

Reply via email to