Hello.

 I encountered an error in the following situation. I use 
topology.max.spout.pending = 20, so there could be multiple concurrent trident 
transactions. My custom spout implements IPartitionedTridentSpout. I added some 
partitions at code level and resubmitted topology. Then I saw that one 
partition ("telemetry,chronos") emitted data from the same offset: first time 
as part of transaction 10544171:1 in thread #48 of worker 28219, then as part 
of transaction 10544172:1 in thread #50 of worker 28220. After this thread #48 
was scheduled to process another set of partitions and partition 
"telemetry,chronos" was "moved" to thread #50 of worker 28220. As a result, 
data was processed twice:

 2015-06-10 12:24:56 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #48 
(Thread-19-spout0) of process 28219@el7-makeev: emitPartitionBatchNew: 
partition=telemetry,chronos tx=10544171:1 cur pos: 
{"fileName":"telemetry-2015-06-10-12_00001","offset":627325755,"length":0} (of 
class: class org.json.simple.JSONObject)
 2015-06-10 12:24:58 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #48 
(Thread-19-spout0) of process 28219@el7-makeev: partition=telemetry,chronos 
tx=10544171:1 flushed 14873 tuples, new pos: 
{fileName=telemetry-2015-06-10-12_00001, offset=627325755, length=8388607}

 2015-06-10 12:24:56 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #50 
(Thread-21-spout0) of process 28220@el7-makeev: emitPartitionBatchNew: 
partition=telemetry,chronos tx=10544172:1 cur pos: 
{"fileName":"telemetry-2015-06-10-12_00001","offset":627325755,"length":0} (of 
class: class org.json.simple.JSONObject)
 2015-06-10 12:24:57 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #50 
(Thread-21-spout0) of process 28220@el7-makeev: partition=telemetry,chronos 
tx=10544172:1 flushed 14873 tuples, new pos: 
{fileName=telemetry-2015-06-10-12_00001, offset=627325755, length=8388607}

 Clearly, partitions were rebalanced across tasks, because I added some 
partitions. Could it be possible that there is a race condition in 
PartitionedTridentSpoutExecutor$Emitter.emitBatch in case of multiple 
concurrent trident transactions? e.g. tx 10544171 received old partitions from 
the coordinator and used offset 627325755 (which was also saved to zookeeper by 
tx 10544170), and has not yet saved new offset to state in zookeeper. But tx 
10544172 with new partitions has concurrently read state of partition from 
zookeeper (which is still offset of tx 10544170) - 10544171 and 10544172 saw 
the same offset.

 -- 
 Best regards
 Aleksey Makeyev

Reply via email to