Thanks Nathan. Your info is appreciated. On Fri, May 15, 2015 at 6:36 AM, Nathan Leung <[email protected]> wrote:
> You should not get this if you're creating a new object every time. The > rule of thumb is that data in tuples should be considered immutable; you > can break the rule but it requires care otherwise you will encounter subtle > problems. > > Given your timeline, this only makes sense if you reuse an object somehow. > > As a note, when storm passes a tuple in process it passes it by reference, > but when it goes over the network the tuple is serialized. This is why you > cannot change data in a tuple one it has been sent. The downstream bolt > could have started processing it. If you have any fan out any other bolts > that received this tuple could be using it also. > On May 15, 2015 1:21 AM, "Banias H" <[email protected]> wrote: > >> Thanks Nathan. >> >> Sorry for the confusion. Bolt_A actually generates a new SimplePojo >> object in every execute() function. So I shouldn't be reusing the same pojo >> object in Bolt_A. >> >> To give you more details, from the same worker log file I am seeing >> (without the copy workaround): >> >> t=1: Bolt_A generated pojo_1 and completed collector.emit(new >> Values(pojo_1)) >> >> t=2: Bolt_B started. >> - Obtained pojo through tuple: SimplePojo pojo = (SimplePojo) >> tuple.getValue(0); >> - Printed out pojo to have values of pojo_1 >> - Continued running execute() >> >> t=3: Bolt_A generated pojo_2 and completed collector.emit(new >> Values(pojo_2)) >> >> t=4: Bolt_B printed out the values of pojo again before the end of >> execute(). It however has the values of pojo_2. >> >> t=5: Bolt_B completed executed() >> >> I checked the worker log files in other hosts and verified that pojo_2 >> got emitted to different host/worker. >> >> >> Would you kindly share the way to safely pass objects between bolts? Many >> thanks. I would really appreciate more info on this. >> >> On Thu, May 14, 2015 at 8:15 PM, Nathan Leung <[email protected]> wrote: >> >>> It sounds like you reuse the same simple pojo object in bolt a. You >>> should avoid doing this; even with your copy workaround it's possible to >>> run into more subtle race conditions. >>> On May 14, 2015 8:51 PM, "Banias H" <[email protected]> wrote: >>> >>>> In prototyping an application, I use a simple pojo (see below) to send >>>> data between bolts. In an earlier version of my bolt, I would simply get >>>> the pojo using tuple.getValue() inside the execute() function, like: >>>> >>>> SimplePojo pojo = (SimplePojo) tuple.getValue(0); >>>> >>>> But when I scale up and have many executors running on the same worker, >>>> I run into a data issue that the pojo would have different values within >>>> execute() function. Upon closer look, it seems like when the same worker >>>> ran many of the two different bolts, it would do something like: >>>> >>>> t=1: Run Bolt_A with pojo_1 >>>> t=2: Run Bolt_B with pojo_1 and run Bolt_A with pojo_2. >>>> >>>> In my topology, Bolt_B runs substantially longer. At t=2, when I >>>> printed out the values of pojo at the beginning of Bolt_B's execute(), it >>>> had values in pojo_1. Then before the end of Bolt_B's execute() when I >>>> printed values of pojo again, it had values in pojo_2. In other words, it >>>> is like the pojo pointed to different SimplePojo within the execute() >>>> function. >>>> >>>> So I ended up putting a copy constructor in SimplePojo and create a new >>>> SimplePojo inside execute() function, like: >>>> >>>> SimplePojo sources = new SimplePojo((SimplePojo) tuple.getValue(0)); >>>> >>>> After that I no longer encounter the data issue above. So my question >>>> is... should I always make a copy of the data in Tuple in bolt? >>>> >>>> Thanks, >>>> B >>>> >>>> >>>> >>>> public class SimplePojo implements KryoSerializable { >>>> private long id1; >>>> private long id2; >>>> private byte[] bin1; >>>> private byte[] bin2; >>>> >>>> public SimplePojo(SimplePojo source) { >>>> id1 = source.getId1(); >>>> id2 = source.getId2(); >>>> bin1 = source.getBin1(); >>>> bin2 = source.getBin2(); >>>> } >>>> >>>> //getters >>>> public long getId1() { >>>> return id1; >>>> } >>>> >>>> public long getId2() { >>>> return id2; >>>> } >>>> >>>> public byte[] getBin1() { >>>> return bin1; >>>> } >>>> >>>> public byte[] getBin2() { >>>> return bin2; >>>> } >>>> >>>> ... >>>> >>>> @Override >>>> public void read(Kryo kryo, Input input) { >>>> id1 = input.readLong(); >>>> id2 = input.readLong(); >>>> >>>> int bin1Len = input.readInt(); >>>> bin1 = input.readBytes(bin1Len); //Line 156 >>>> >>>> int bin2Len = input.readInt(); >>>> bin2 = input.readBytes(bin2Len); >>>> } >>>> >>>> @Override >>>> public void write(Kryo kryo, Output output) { >>>> output.writeLong(id1); >>>> output.writeLong(id2); >>>> >>>> output.writeInt(bin1.length); >>>> output.write(bin1); >>>> >>>> output.writeInt(bin2.length); >>>> output.write(bin2); >>>> } >>>> } >>>> >>> >>
