I've got a very very strange problem with one of my topologies. We've tried
deploying to a clustered environment, and the trident topology we've got isn't
running the Emitter when the Coordinator returns true from isReady(). At all.
The logging message right at the start of the method is not being printed to
the worker logs. The getOrderedPartitions and refreshPartitions are called, but
not emitPartitionBatchNew
This is the output of the debug logs when an event is sent to the kafka topic:
2014-05-09 12:19:57 u.c.f.t.k.t.TridentKafkaSpout$Coordinator [DEBUG] Spout
ready to read from EventsIn at offset 253
2014-05-09 12:19:57 u.c.f.t.k.t.TridentKafkaSpout$Coordinator [DEBUG]
Coordinator triggering batch 171
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $mastercoord-bg0 $batch [171:0]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
$mastercoord-bg0:1, stream: $batch, id:
{-8335840769555710216=-5840416573241105698}, [171:0]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $mastercoord-bg0 __ack_init
[-8335840769555710216 -5840416573241105698 1]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
$mastercoord-bg0:1, stream: __ack_init, id: {}, [-8335840769555710216
-5840416573241105698 1]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $spoutcoord-spout0 $batch
[171:0, [0]]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $spoutcoord-spout0 __ack_ack
[-8335840769555710216 -6305116143407737306]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
$spoutcoord-spout0:2, stream: $batch, id:
{-8335840769555710216=472058086231717112}, [171:0, [0]]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
$spoutcoord-spout0:2, stream: __ack_ack, id: {}, [-8335840769555710216
-6305116143407737306]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting direct: 4; spout0 $coord-bg0
[171:0, 0]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: spout0 __ack_ack
[-8335840769555710216 -7843298785019326174]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
spout0:5, stream: $coord-bg0, id: {-8335840769555710216=-7662286408926976550},
[171:0, 0]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting:
b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output
__ack_ack [-8335840769555710216 -7662286408926976550]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
spout0:5, stream: __ack_ack, id: {}, [-8335840769555710216 -7843298785019326174]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output:4,
stream: __ack_ack, id: {}, [-8335840769555710216 -7662286408926976550]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting direct: 1; __acker __ack_ack
[-8335840769555710216]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
__acker:3, stream: __ack_ack, id: {}, [-8335840769555710216]
2014-05-09 12:19:57 b.s.d.executor [INFO] Acking message 171:0
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $mastercoord-bg0 $commit [171:0]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
$mastercoord-bg0:1, stream: $commit, id:
{-4294881045221605715=-9054173560631528320}, [171:0]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting:
b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output
__ack_ack [-4294881045221605715 -9054173560631528320]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output:4,
stream: __ack_ack, id: {}, [-4294881045221605715 -9054173560631528320]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $mastercoord-bg0 __ack_init
[-4294881045221605715 -9054173560631528320 1]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
$mastercoord-bg0:1, stream: __ack_init, id: {}, [-4294881045221605715
-9054173560631528320 1]
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting direct: 1; __acker __ack_ack
[-4294881045221605715]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
__acker:3, stream: __ack_ack, id: {}, [-4294881045221605715]
2014-05-09 12:19:57 b.s.d.executor [INFO] Acking message 171:0
2014-05-09 12:19:57 b.s.d.task [INFO] Emitting: $mastercoord-bg0 $success
[171:0]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
$mastercoord-bg0:1, stream: $success, id: {}, [171:0]
2014-05-09 12:19:57 b.s.d.executor [INFO] Processing received message source:
$mastercoord-bg0:1, stream: $success, id: {}, [171:0]
And this is the storm UI for the topology:
Spouts (All time)
Id
Executors
Tasks
Emitted
Transferred
Complete latency (ms)
Acked
Failed
Last error
$mastercoord-bg0
1
1
20
20
0.000
0
0
Bolts (All time)
Id
Executors
Tasks
Emitted
Transferred
Capacity (last 10m)
Execute latency (ms)
Executed
Process latency (ms)
Acked
Failed
Last error
$spoutcoord-spout0
1
1
20
0
0.000
0.000
0
0.000
0
0
__acker
1
1
0
0
0.000
0.000
0
0.000
20
0
b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output
1
1
20
20
0.000
0.000
120
0.000
0
0
spout0
1
1
20
20
0.000
0.000
120
0.000
0
0
TridentKafkaSpout is our own kafka spout, implemented as a
PartitionedTridentSpout. The coordinator does a long poll of the input topic,
and returns true from isReady when the long poll returns with new data in the
topic (the two debug messages at the top indicate this is happening)
There's only a single non-spout bolt in the topology, called
'b-0-events-events-events-Entities-consumer-partition-Entities-bin-partition-consumer-output'.
From what I can see, emiPartitionBatchNew simply isn't being called, even
though it's a new batch, and the implementation of TridentKafkaSpout ensures
there's always 1 partition in the batch.
Can anyone shed any light as to what's going on, why the emitter is not being
called to output data? Or provide any useful pointers to how I might debug this
problem more? I've turned on all the logging I can, and there's no obvious next
step I can explore to fix this issue.
Many thanks,
SimonC