Hi all.
Here’s what I’m seeing. I’ve got a fairly simple topology, consisting of 3
bolts. Kafka spout, simple processing bolt (JSON parse to POJO, a bit of
processing, and back to JSON), and Kafka Bolt (output). 12 workers, Xmx1G. It
runs happily for a little over a day, then basically slows down/stops
processing altogether. Cluster is instrumented with storm-graphite
(https://github.com/verisign/storm-graphite). When the topology is freshly
deployed, spout complete latency averages around 5ms, and JVM heap usage starts
at around 100MB across all workers. Very little PSMarkSweep GC activity. The
workers slowly creep up in heap usage across all workers, until they start
brushing up against max heap. At this point, the topology is pretty sad: spout
complete latency averages 5sec, spout lag starts to increase, spout fail counts
average 300/min, and PSMarkSweep GC averages 15+ seconds per run (!!!)
averaging ~2 GC runs/worker/minute. The JVMs are pretty much hosed at this
point, and I stop seeing the topology doing much useful work.
I took a heapdump via JVisualVM of one of the maxed out workers. The majority
of the heap usage is dominated by the following structure:
==============================================================================
field type retained heap
------------------------------------------------------------------------------
this acker 1,132,188,840
state Container 1,132,188,816
object acker$mk_acker_bolt$reify__803 1,132,188,792
output_collector MutableObject 184
pending MutableObject 1,132,188,568
o RotatingMap 1,132,188,544
...
Within the acker’s pending map, eventually we get to a huge HashMap (8,388,608
items!) with <Long, PersistentArrayMap<Keyword, Long> pairs, eg:
key: 3133635607298342113, value: [ clojure.lang.Keyword #11,
7092611081953912005 ]
This is probably what’s causing my workers to run out of heap. But why is each
worker keeping track of so many pending (tuples?) My processing RichBolt acks
immediately (since I fail-fast if I run into any parsing issues), so I can’t
think of a reason why so many tuples would be pending (8 million+ ?).
Any ideas what might be causing this apparent leak? I feel like we’re running
a pretty stock topology. Restarting the topology every day is the only
reliable way to keep my topology running, not exactly the most scalable
solution :p
Here are more details about my topology (including settings that I believe I’ve
changed away from the default).
* storm-core, storm-kafka 0.9.5
* one acker per worker, so 12 ackers.
.yaml Config:
------------------------------------------------------------------------------------------------
topology.debug: false
topology.max.spout.pending: 2500 # The maximum number of tuples that can be
pending on a spout task at any given time.
topology.spout.max.batch.size: 65 * 1024
topology.workers: 12
topology.worker.childopts: "-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.port=1%ID% -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/tmp/“
------------------------------------------------------------------------------------------------
Topology Construction:
------------------------------------------------------------------------------------------------
public static void buildProcessingTopology(final TopologyBuilder builder, final
Config config, final String environment) {
final Map<String, Object> kafkaConfig = checkNotNull((Map<String, Object>)
config.get(CFG_KAFKA));
final Map<String, Object> zookeeperConfig = checkNotNull((Map<String,
Object>) config.get(CFG_ZOOKEEPER));
final ZkHosts hosts = new ZkHosts((String) zookeeperConfig.get("address"));
final String CLIENT_ID = "storm-spout-" + environment;
final SpoutConfig spoutConfig = new SpoutConfig(hosts,
checkNotNull((String) kafkaConfig.get("input_topic")),
checkNotNull((String)
zookeeperConfig.get("root")), CLIENT_ID);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
builder.setBolt("processor", new ProcessorBolt("json"),
4).shuffleGrouping("kafka-spout");
builder.setBolt("kafka-output",
new KafkaBolt<String, String>()
.withTopicSelector(new
DefaultTopicSelector(checkNotNull((String) kafkaConfig.get("output_topic"))))
.withTupleToKafkaMapper(new
FieldNameBasedTupleToKafkaMapper("key", "json")),
1).shuffleGrouping("processor");
}
------------------------------------------------------------------------------------------------
Processing Bolt (simplified):
------------------------------------------------------------------------------------------------
public class ProcessorBolt extends BaseRichBolt {
private String outputField;
private OutputCollector outputCollector;
protected ProcessorBolt(final String outputField) {
this.outputField = outputField;
}
public void prepare(final Map conf, final TopologyContext context, final
OutputCollector collector) {
this.outputCollector = collector;
}
public void execute(final Tuple tuple) {
final String wirejson = tuple.getString(0);
// always ack, nothing to recover from if we can't parse/process
this.outputCollector.ack(tuple);
// parse json, process some stuff, get a list of maps back
// ...
for(Map<String, Object> m : maps) {
String json = toJson(m);
this.outputCollector.emit(tuple, new Values(json));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(this.outputField));
}
}
------------------------------------------------------------------------------------------------
Let me know if you need any additional info.
Thanks in advance,
-Jason