Hi,
You should ack input tuple after emitting new ones :
try {
// parse json string
...
// then emit
} catch (Throwable t) {
/*nothing to recover */
} finally {
collector.ack(tuple)
}
Hope this will fix your issue.
Le 21 août 2015 02:17, "Jason Chen" <[email protected]> a écrit :
> Hi all.
>
> Here’s what I’m seeing. I’ve got a fairly simple topology, consisting of
> 3 bolts. Kafka spout, simple processing bolt (JSON parse to POJO, a bit of
> processing, and back to JSON), and Kafka Bolt (output). 12 workers,
> Xmx1G. It runs happily for a little over a day, then basically slows
> down/stops processing altogether. Cluster is instrumented with
> storm-graphite (https://github.com/verisign/storm-graphite). When the
> topology is freshly deployed, spout complete latency averages around *5ms*,
> and JVM heap usage starts at around *100MB* across all workers. Very
> little PSMarkSweep GC activity. The workers slowly creep up in heap usage
> across all workers, until they start brushing up against max heap. At this
> point, the topology is pretty sad: spout complete latency averages *5sec*,
> spout lag starts to increase, spout fail counts average *300*/min, and
> PSMarkSweep GC averages *15+ seconds per run* (!!!) averaging ~2 GC
> runs/worker/minute. The JVMs are pretty much hosed at this point, and I
> stop seeing the topology doing much useful work.
>
> I took a heapdump via JVisualVM of one of the maxed out workers. The
> majority of the heap usage is dominated by the following structure:
>
>
> ==============================================================================
> field type retained heap
>
> ------------------------------------------------------------------------------
> this acker 1,132,188,840
> state Container 1,132,188,816
> object acker$mk_acker_bolt$reify__803 1,132,188,792
> output_collector MutableObject 184
> pending MutableObject 1,132,188,568
> o RotatingMap 1,132,188,544
> ...
>
> Within the acker’s pending map, eventually we get to a huge HashMap
> (8,388,608 items!) with <Long, PersistentArrayMap<Keyword, Long> pairs,
> eg:
> key: 3133635607298342113, value: [ clojure.lang.Keyword
> #11, 7092611081953912005 ]
>
> This is probably what’s causing my workers to run out of heap. But why is
> each worker keeping track of so many pending (tuples?) My processing
> RichBolt acks immediately (since I fail-fast if I run into any parsing
> issues), so I can’t think of a reason why so many tuples would be pending
> (8 million+ ?).
>
> Any ideas what might be causing this apparent leak? I feel like we’re
> running a pretty stock topology. Restarting the topology every day is the
> only reliable way to keep my topology running, not exactly the most
> scalable solution :p
>
> *Here are more details about my topology (including settings that I
> believe I’ve changed away from the default).*
> * storm-core, storm-kafka *0.9.5*
> ** *one acker per worker, so 12 ackers.
>
> *.yaml Config:*
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
> topology.debug: false
> topology.max.spout.pending: 2500 # The maximum number of tuples that can
> be pending on a spout task at any given time.
> topology.spout.max.batch.size: 65 * 1024
> topology.workers: 12
>
> topology.worker.childopts: "-Dcom.sun.management.jmxremote
> -Dcom.sun.management.jmxremote.ssl=false
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.port=1%ID% -XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/tmp/“
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
>
> *Topology Construction:*
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
> public static void buildProcessingTopology(final TopologyBuilder builder,
> final Config config, final String environment) {
> final Map<String, Object> kafkaConfig = checkNotNull((Map<String,
> Object>) config.get(CFG_KAFKA));
> final Map<String, Object> zookeeperConfig = checkNotNull((Map<String,
> Object>) config.get(CFG_ZOOKEEPER));
>
> final ZkHosts hosts = new ZkHosts((String)
> zookeeperConfig.get("address"));
> final String CLIENT_ID = "storm-spout-" + environment;
> final SpoutConfig spoutConfig = new SpoutConfig(hosts,
> checkNotNull((String) kafkaConfig.get("input_topic")),
> checkNotNull((String)
> zookeeperConfig.get("root")), CLIENT_ID);
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>
>
> builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
>
> builder.setBolt("processor", new ProcessorBolt("json"),
> 4).shuffleGrouping("kafka-spout");
>
> builder.setBolt("kafka-output",
> new KafkaBolt<String, String>()
> .withTopicSelector(new
> DefaultTopicSelector(checkNotNull((String)
> kafkaConfig.get("output_topic"))))
> .withTupleToKafkaMapper(new
> FieldNameBasedTupleToKafkaMapper("key", "json")),
> 1).shuffleGrouping("processor");
> }
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
>
>
> *Processing Bolt (simplified):*
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
> public class ProcessorBolt extends BaseRichBolt {
>
> private String outputField;
> private OutputCollector outputCollector;
>
> protected ProcessorBolt(final String outputField) {
> this.outputField = outputField;
> }
>
> public void prepare(final Map conf, final TopologyContext context,
> final OutputCollector collector) {
> this.outputCollector = collector;
> }
>
> public void execute(final Tuple tuple) {
> final String wirejson = tuple.getString(0);
>
> // always ack, nothing to recover from if we can't parse/process
> this.outputCollector.ack(tuple);
>
> // parse json, process some stuff, get a list of maps back
> // ...
>
> for(Map<String, Object> m : maps) {
> String json = toJson(m);
>
> this.outputCollector.emit(tuple, new Values(json));
> }
> }
>
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields(this.outputField));
> }
> }
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
>
> Let me know if you need any additional info.
>
> Thanks in advance,
> -Jason
>