Hello.
I encountered an error in the following situation. I use
topology.max.spout.pending = 20, so there could be multiple concurrent trident
transactions. My custom spout implements IPartitionedTridentSpout. I added some
partitions at code level and resubmitted topology. Then I saw that one
partition ("telemetry,chronos") emitted data from the same offset: first time
as part of transaction 10544171:1 in thread #48 of worker 28219, then as part
of transaction 10544172:1 in thread #50 of worker 28220. After this thread #48
was scheduled to process another set of partitions and partition
"telemetry,chronos" was "moved" to thread #50 of worker 28220. As a result,
data was processed twice:
2015-06-10 12:24:56 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #48
(Thread-19-spout0) of process 28219@el7-makeev: emitPartitionBatchNew:
partition=telemetry,chronos tx=10544171:1 cur pos:
{"fileName":"telemetry-2015-06-10-12_00001","offset":627325755,"length":0} (of
class: class org.json.simple.JSONObject)
2015-06-10 12:24:58 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #48
(Thread-19-spout0) of process 28219@el7-makeev: partition=telemetry,chronos
tx=10544171:1 flushed 14873 tuples, new pos:
{fileName=telemetry-2015-06-10-12_00001, offset=627325755, length=8388607}
2015-06-10 12:24:56 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #50
(Thread-21-spout0) of process 28220@el7-makeev: emitPartitionBatchNew:
partition=telemetry,chronos tx=10544172:1 cur pos:
{"fileName":"telemetry-2015-06-10-12_00001","offset":627325755,"length":0} (of
class: class org.json.simple.JSONObject)
2015-06-10 12:24:57 r.m.g.w.s.r.t.MTStreamSpout$Emitter [INFO] thread #50
(Thread-21-spout0) of process 28220@el7-makeev: partition=telemetry,chronos
tx=10544172:1 flushed 14873 tuples, new pos:
{fileName=telemetry-2015-06-10-12_00001, offset=627325755, length=8388607}
Clearly, partitions were rebalanced across tasks, because I added some
partitions. Could it be possible that there is a race condition in
PartitionedTridentSpoutExecutor$Emitter.emitBatch in case of multiple
concurrent trident transactions? e.g. tx 10544171 received old partitions from
the coordinator and used offset 627325755 (which was also saved to zookeeper by
tx 10544170), and has not yet saved new offset to state in zookeeper. But tx
10544172 with new partitions has concurrently read state of partition from
zookeeper (which is still offset of tx 10544170) - 10544171 and 10544172 saw
the same offset.
--
Best regards
Aleksey Makeyev