Here are additional logs. System is loosing messages about expected tuples
count (tracked.expectedTupleCount) and replays those that are lost only
after 30 seconds (on 35 _system tick). Max spout pending parameter has
vicarious influence on this delay. If I make it too small, batch would be
replayed indefinetly.

My guess is that expected tuple count is sent in finishBatch method of
TridentBoltExecutor as a "$coord-" stream:
 263:   _collector.emitDirect(task, stream, finishTuple, new
Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task,
0)));

and another object of the same class receives it in execute method (:371):
 371:  } else if(t==TupleType.COORD) {
               int count = tuple.getInteger(1);
               tracked.reportedTasks++;
               tracked.expectedTupleCount+=count;
               checkFinish(tracked, tuple, t);
         } else { // s1 || spoutcoord

In logs on 35 second __system tick tuple lost (delayed) tuples are received
on the rest of machines and after that batch could be acked.

Logs:
...
 {:time "03:45:25", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("17023:0" "10"), :source "spout0:15", :stream "$coord-bg0"}
 {:time "03:45:25", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("17023:0" "10"), :source "spout0:15", :stream "$coord-bg0"}
 {:time "03:45:25", :host "comp154", :type :emitting-task,  :batch-id nil,
:tuple ("9007913989709953633 -1234738511404338705"),  :source "b-0",
 :stream "__ack_ack"}
 {:time "03:45:25",  :host "comp154",  :type :emitting-task,  :batch-id
nil,  :tuple ("9007913989709953633 5341692312880547649"),  :source "b-0",
 :stream "__ack_ack"}
 {:time "03:45:25", :host "comp154", :type :received, :batch-id nil, :tuple
("9007913989709953633 5341692312880547649"), :source "b-0:14", :stream
"__ack_ack"}
 {:time "03:45:25", :host "comp154", :type :boltack, :batch-id nil, :tuple
("9007913989709953633 5341692312880547649"), :source "b-0:14", :stream
"__ack_ack"}
 {:time "03:45:25",  :host "comp154",  :type :execute-done,  :batch-id nil,
 :tuple ("9007913989709953633 -1234738511404338705"),  :source "b-0:9",
 :stream "__ack_ack"}
 {:time "03:45:25",  :host "comp154",  :type :execute-done,  :batch-id nil,
 :tuple ("9007913989709953633 5341692312880547649"),  :source "b-0:14",
 :stream "__ack_ack"}
 {:time "03:45:26", :host "comp154", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp153", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp153", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp151", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp151", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp150", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp150", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp151", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp151", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp154", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp152", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp152", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp150", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:26", :host "comp150", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp154", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp151", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp151", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp153", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp153", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp150", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp150", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp151", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp151", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp154", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp152", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp152", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp150", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:31", :host "comp150", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp154", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp151", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp151", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp153", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp153", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp150", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp150", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp151", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp151", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp154", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp152", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp152", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp150", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:36", :host "comp150", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp154", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp151", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp151", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp153", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp153", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp150", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp150", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp151", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp151", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp154", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp152", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp152", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp150", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:41", :host "comp150", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp154", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp151", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp151", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp153", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp153", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp150", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp150", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp151", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp151", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp154", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp154", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp152", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp152", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp150", :type :received, :batch-id nil, :tuple
("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:46", :host "comp150", :type :execute-done, :batch-id nil,
:tuple ("5"), :source "__system:-1", :stream "__tick"}
 {:time "03:45:51", :host "comp152", :type :received, :batch-id nil, :tuple
("17023:0" "10"), :source "spout0:15", :stream "$coord-bg0"}

2015-04-06 16:39 GMT+05:00 Vladimir Protsenko <[email protected]>:

> Hello. I am trying to benchmark a simple Trident topology: IBatch Custom
> Spout -> Bolt. Every time I am starting topology I have the same issue - *a
> delay between the first and the second batch*.
>
> Legend of the picture: blue - timestamp of emitting tuple, yellow and
> green - timestamps of processing steps in Bolt, magenta - timestamp of ack
> function in Spout.
>
>
> ​So in this case every tuple is succesfully processed, but acknowledgment
> comes after *30 seconds*. Do you have any ideas of what is happening?
>

Reply via email to