Thanks Nathan. Sorry for the confusion. Bolt_A actually generates a new SimplePojo object in every execute() function. So I shouldn't be reusing the same pojo object in Bolt_A.
To give you more details, from the same worker log file I am seeing (without the copy workaround): t=1: Bolt_A generated pojo_1 and completed collector.emit(new Values(pojo_1)) t=2: Bolt_B started. - Obtained pojo through tuple: SimplePojo pojo = (SimplePojo) tuple.getValue(0); - Printed out pojo to have values of pojo_1 - Continued running execute() t=3: Bolt_A generated pojo_2 and completed collector.emit(new Values(pojo_2)) t=4: Bolt_B printed out the values of pojo again before the end of execute(). It however has the values of pojo_2. t=5: Bolt_B completed executed() I checked the worker log files in other hosts and verified that pojo_2 got emitted to different host/worker. Would you kindly share the way to safely pass objects between bolts? Many thanks. I would really appreciate more info on this. On Thu, May 14, 2015 at 8:15 PM, Nathan Leung <[email protected]> wrote: > It sounds like you reuse the same simple pojo object in bolt a. You should > avoid doing this; even with your copy workaround it's possible to run into > more subtle race conditions. > On May 14, 2015 8:51 PM, "Banias H" <[email protected]> wrote: > >> In prototyping an application, I use a simple pojo (see below) to send >> data between bolts. In an earlier version of my bolt, I would simply get >> the pojo using tuple.getValue() inside the execute() function, like: >> >> SimplePojo pojo = (SimplePojo) tuple.getValue(0); >> >> But when I scale up and have many executors running on the same worker, I >> run into a data issue that the pojo would have different values within >> execute() function. Upon closer look, it seems like when the same worker >> ran many of the two different bolts, it would do something like: >> >> t=1: Run Bolt_A with pojo_1 >> t=2: Run Bolt_B with pojo_1 and run Bolt_A with pojo_2. >> >> In my topology, Bolt_B runs substantially longer. At t=2, when I printed >> out the values of pojo at the beginning of Bolt_B's execute(), it had >> values in pojo_1. Then before the end of Bolt_B's execute() when I printed >> values of pojo again, it had values in pojo_2. In other words, it is like >> the pojo pointed to different SimplePojo within the execute() function. >> >> So I ended up putting a copy constructor in SimplePojo and create a new >> SimplePojo inside execute() function, like: >> >> SimplePojo sources = new SimplePojo((SimplePojo) tuple.getValue(0)); >> >> After that I no longer encounter the data issue above. So my question >> is... should I always make a copy of the data in Tuple in bolt? >> >> Thanks, >> B >> >> >> >> public class SimplePojo implements KryoSerializable { >> private long id1; >> private long id2; >> private byte[] bin1; >> private byte[] bin2; >> >> public SimplePojo(SimplePojo source) { >> id1 = source.getId1(); >> id2 = source.getId2(); >> bin1 = source.getBin1(); >> bin2 = source.getBin2(); >> } >> >> //getters >> public long getId1() { >> return id1; >> } >> >> public long getId2() { >> return id2; >> } >> >> public byte[] getBin1() { >> return bin1; >> } >> >> public byte[] getBin2() { >> return bin2; >> } >> >> ... >> >> @Override >> public void read(Kryo kryo, Input input) { >> id1 = input.readLong(); >> id2 = input.readLong(); >> >> int bin1Len = input.readInt(); >> bin1 = input.readBytes(bin1Len); //Line 156 >> >> int bin2Len = input.readInt(); >> bin2 = input.readBytes(bin2Len); >> } >> >> @Override >> public void write(Kryo kryo, Output output) { >> output.writeLong(id1); >> output.writeLong(id2); >> >> output.writeInt(bin1.length); >> output.write(bin1); >> >> output.writeInt(bin2.length); >> output.write(bin2); >> } >> } >> >
