Following Evan Spark's question from July, we also encountered this issue.
At some point in time, the spout worker crashes with the below error in the
log.
Any ideas why this happens?
Is it even correct to throw runtime exception here?
2014-11-27 05:38:09 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Error when
re-emitting batch. overshot the end offset
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__4822$fn__4834$fn__4881.invoke(executor.clj:746)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__455.invoke(util.clj:431)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:701) ~[na:1.6.0_33]
Caused by: java.lang.RuntimeException: Error when re-emitting batch.
overshot the end offset
at
storm.kafka.trident.TridentKafkaEmitter.reEmitPartitionBatch(TridentKafkaEmitter.java:162)
~[stormjar.jar:na]
at
storm.kafka.trident.TridentKafkaEmitter.access$500(TridentKafkaEmitter.java:46)
~[stormjar.jar:na]
at
storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatch(TridentKafkaEmitter.java:243)
~[stormjar.jar:na]
at
storm.kafka.trident.TridentKafkaEmitter$2.emitPartitionBatch(TridentKafkaEmitter.java:226)
~[stormjar.jar:na]
at
storm.trident.spout.PartitionedTridentSpoutExecutor$Emitter.emitBatch(PartitionedTridentSpoutExecutor.java:133)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:369)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__4822$tuple_action_fn__4824.invoke(executor.clj:631)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$mk_task_receiver$fn__4745.invoke(executor.clj:399)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.disruptor$clojure_handler$reify__833.onEvent(disruptor.clj:58)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
... 6 common frames omitted
This seems to be the source of this exception:
/**
* re-emit the batch described by the meta data provided
*
* @param attempt
* @param collector
* @param partition
* @param meta
*/
private void reEmitPartitionBatch(TransactionAttempt attempt,
TridentCollector collector, Partition partition, Map meta) {
LOG.info("re-emitting batch, attempt " + attempt);
String instanceId = (String) meta.get("instanceId");
if (!_config.forceFromStart ||
instanceId.equals(_topologyInstanceId)) {
SimpleConsumer consumer = _connections.register(partition);
long offset = (Long) meta.get("offset");
long nextOffset = (Long) meta.get("nextOffset");
ByteBufferMessageSet msgs = fetchMessages(consumer, partition,
offset);
for (MessageAndOffset msg : msgs) {
if (offset == nextOffset) {
break;
}
if (offset > nextOffset) {
throw new RuntimeException("Error when re-emitting
batch. overshot the end offset");
}
emit(collector, msg.message());
offset = msg.nextOffset();
}
}
}