It sounds like you reuse the same simple pojo object in bolt a. You should
avoid doing this; even with your copy workaround it's possible to run into
more subtle race conditions.
On May 14, 2015 8:51 PM, "Banias H" <[email protected]> wrote:
> In prototyping an application, I use a simple pojo (see below) to send
> data between bolts. In an earlier version of my bolt, I would simply get
> the pojo using tuple.getValue() inside the execute() function, like:
>
> SimplePojo pojo = (SimplePojo) tuple.getValue(0);
>
> But when I scale up and have many executors running on the same worker, I
> run into a data issue that the pojo would have different values within
> execute() function. Upon closer look, it seems like when the same worker
> ran many of the two different bolts, it would do something like:
>
> t=1: Run Bolt_A with pojo_1
> t=2: Run Bolt_B with pojo_1 and run Bolt_A with pojo_2.
>
> In my topology, Bolt_B runs substantially longer. At t=2, when I printed
> out the values of pojo at the beginning of Bolt_B's execute(), it had
> values in pojo_1. Then before the end of Bolt_B's execute() when I printed
> values of pojo again, it had values in pojo_2. In other words, it is like
> the pojo pointed to different SimplePojo within the execute() function.
>
> So I ended up putting a copy constructor in SimplePojo and create a new
> SimplePojo inside execute() function, like:
>
> SimplePojo sources = new SimplePojo((SimplePojo) tuple.getValue(0));
>
> After that I no longer encounter the data issue above. So my question
> is... should I always make a copy of the data in Tuple in bolt?
>
> Thanks,
> B
>
>
>
> public class SimplePojo implements KryoSerializable {
> private long id1;
> private long id2;
> private byte[] bin1;
> private byte[] bin2;
>
> public SimplePojo(SimplePojo source) {
> id1 = source.getId1();
> id2 = source.getId2();
> bin1 = source.getBin1();
> bin2 = source.getBin2();
> }
>
> //getters
> public long getId1() {
> return id1;
> }
>
> public long getId2() {
> return id2;
> }
>
> public byte[] getBin1() {
> return bin1;
> }
>
> public byte[] getBin2() {
> return bin2;
> }
>
> ...
>
> @Override
> public void read(Kryo kryo, Input input) {
> id1 = input.readLong();
> id2 = input.readLong();
>
> int bin1Len = input.readInt();
> bin1 = input.readBytes(bin1Len); //Line 156
>
> int bin2Len = input.readInt();
> bin2 = input.readBytes(bin2Len);
> }
>
> @Override
> public void write(Kryo kryo, Output output) {
> output.writeLong(id1);
> output.writeLong(id2);
>
> output.writeInt(bin1.length);
> output.write(bin1);
>
> output.writeInt(bin2.length);
> output.write(bin2);
> }
> }
>