Thanks Nathan.

Sorry for the confusion. Bolt_A actually generates a new SimplePojo object
in every execute() function. So I shouldn't be reusing the same pojo object
in Bolt_A.

To give you more details, from the same worker log file I am seeing
(without the copy workaround):

t=1: Bolt_A generated pojo_1 and completed collector.emit(new
Values(pojo_1))

t=2: Bolt_B started.
   - Obtained pojo through tuple: SimplePojo pojo = (SimplePojo)
tuple.getValue(0);
   - Printed out pojo to have values of pojo_1
   - Continued running execute()

t=3: Bolt_A generated pojo_2 and completed collector.emit(new
Values(pojo_2))

t=4: Bolt_B printed out the values of pojo again before the end of
execute(). It however has the values of pojo_2.

t=5: Bolt_B completed executed()

I checked the worker log files in other hosts and verified that pojo_2 got
emitted to different host/worker.


Would you kindly share the way to safely pass objects between bolts? Many
thanks. I would really appreciate more info on this.

On Thu, May 14, 2015 at 8:15 PM, Nathan Leung <[email protected]> wrote:

> It sounds like you reuse the same simple pojo object in bolt a. You should
> avoid doing this; even with your copy workaround it's possible to run into
> more subtle race conditions.
> On May 14, 2015 8:51 PM, "Banias H" <[email protected]> wrote:
>
>> In prototyping an application, I use a simple pojo (see below) to send
>> data between bolts. In an earlier version of my bolt, I would simply get
>> the pojo using tuple.getValue() inside the execute() function, like:
>>
>> SimplePojo pojo = (SimplePojo) tuple.getValue(0);
>>
>> But when I scale up and have many executors running on the same worker, I
>> run into a data issue that the pojo would have different values within
>> execute() function. Upon closer look, it seems like when the same worker
>> ran many of the two different bolts, it would do something like:
>>
>> t=1: Run Bolt_A with pojo_1
>> t=2: Run Bolt_B with pojo_1 and run Bolt_A with pojo_2.
>>
>> In my topology, Bolt_B runs substantially longer. At t=2, when I printed
>> out the values of pojo at the beginning of Bolt_B's execute(), it had
>> values in pojo_1. Then before the end of Bolt_B's execute() when I printed
>> values of pojo again, it had values in pojo_2. In other words, it is like
>> the pojo pointed to different SimplePojo within the execute() function.
>>
>> So I ended up putting a copy constructor in SimplePojo and create a new
>> SimplePojo inside execute() function, like:
>>
>> SimplePojo sources = new SimplePojo((SimplePojo) tuple.getValue(0));
>>
>> After that I no longer encounter the data issue above. So my question
>> is... should I always make a copy of the data in Tuple in bolt?
>>
>> Thanks,
>> B
>>
>>
>>
>> public class SimplePojo implements KryoSerializable {
>>  private long id1;
>> private long id2;
>> private byte[] bin1;
>> private byte[] bin2;
>>
>> public SimplePojo(SimplePojo source) {
>> id1 = source.getId1();
>> id2 = source.getId2();
>> bin1 = source.getBin1();
>> bin2 = source.getBin2();
>> }
>>
>> //getters
>> public long getId1() {
>> return id1;
>> }
>>
>> public long getId2() {
>> return id2;
>> }
>>
>> public byte[] getBin1() {
>> return bin1;
>> }
>>
>> public byte[] getBin2() {
>> return bin2;
>> }
>>
>> ...
>>
>> @Override
>> public void read(Kryo kryo, Input input) {
>> id1 = input.readLong();
>> id2 = input.readLong();
>>
>> int bin1Len = input.readInt();
>> bin1 = input.readBytes(bin1Len); //Line 156
>>
>> int bin2Len = input.readInt();
>> bin2 = input.readBytes(bin2Len);
>> }
>>
>> @Override
>> public void write(Kryo kryo, Output output) {
>> output.writeLong(id1);
>> output.writeLong(id2);
>>
>> output.writeInt(bin1.length);
>> output.write(bin1);
>>
>> output.writeInt(bin2.length);
>> output.write(bin2);
>> }
>> }
>>
>

Reply via email to