Hi,

You should ack input tuple after emitting new ones :

try {
// parse json string
...
// then emit
} catch (Throwable t) {
/*nothing to recover */
} finally {
collector.ack(tuple)
}

Hope this will fix your issue.
Le 21 août 2015 02:17, "Jason Chen" <[email protected]> a écrit :

> Hi all.
>
> Here’s what I’m seeing.  I’ve got a fairly simple topology, consisting of
> 3 bolts.  Kafka spout, simple processing bolt (JSON parse to POJO, a bit of
> processing, and back to JSON), and Kafka Bolt (output).  12 workers,
> Xmx1G.  It runs happily for a little over a day, then basically slows
> down/stops processing altogether.  Cluster is instrumented with
> storm-graphite (https://github.com/verisign/storm-graphite).  When the
> topology is freshly deployed, spout complete latency averages around *5ms*,
> and JVM heap usage starts at around *100MB* across all workers.  Very
> little PSMarkSweep GC activity.  The workers slowly creep up in heap usage
> across all workers, until they start brushing up against max heap.  At this
> point, the topology is pretty sad: spout complete latency averages *5sec*,
> spout lag starts to increase, spout fail counts average *300*/min, and
> PSMarkSweep GC averages *15+ seconds per run* (!!!) averaging ~2 GC
> runs/worker/minute.  The JVMs are pretty much hosed at this point, and I
> stop seeing the topology doing much useful work.
>
> I took a heapdump via JVisualVM of one of the maxed out workers.  The
> majority of the heap usage is dominated by the following structure:
>
>
> ==============================================================================
> field              type                            retained heap
>
> ------------------------------------------------------------------------------
> this               acker                           1,132,188,840
> state              Container                       1,132,188,816
> object             acker$mk_acker_bolt$reify__803  1,132,188,792
> output_collector   MutableObject                   184
> pending            MutableObject                   1,132,188,568
>   o                RotatingMap                     1,132,188,544
> ...
>
> Within the acker’s pending map, eventually we get to a huge HashMap
> (8,388,608 items!) with <Long, PersistentArrayMap<Keyword, Long> pairs,
> eg:
> key: 3133635607298342113, value: [ clojure.lang.Keyword
> #11, 7092611081953912005 ]
>
> This is probably what’s causing my workers to run out of heap.  But why is
> each worker keeping track of so many pending (tuples?)  My processing
> RichBolt acks immediately (since I fail-fast if I run into any parsing
> issues), so I can’t think of a reason why so many tuples would be pending
> (8 million+ ?).
>
> Any ideas what might be causing this apparent leak?  I feel like we’re
> running a pretty stock topology.  Restarting the topology every day is the
> only reliable way to keep my topology running, not exactly the most
> scalable solution :p
>
> *Here are more details about my topology (including settings that I
> believe I’ve changed away from the default).*
> * storm-core, storm-kafka *0.9.5*
> ** *one acker per worker, so 12 ackers.
>
> *.yaml Config:*
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
> topology.debug: false
> topology.max.spout.pending: 2500 # The maximum number of tuples that can
> be pending on a spout task at any given time.
> topology.spout.max.batch.size: 65 * 1024
> topology.workers: 12
>
> topology.worker.childopts: "-Dcom.sun.management.jmxremote
> -Dcom.sun.management.jmxremote.ssl=false
> -Dcom.sun.management.jmxremote.authenticate=false
> -Dcom.sun.management.jmxremote.port=1%ID% -XX:+HeapDumpOnOutOfMemoryError
> -XX:HeapDumpPath=/tmp/“
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
>
> *Topology Construction:*
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
> public static void buildProcessingTopology(final TopologyBuilder builder,
> final Config config, final String environment) {
>     final Map<String, Object> kafkaConfig = checkNotNull((Map<String,
> Object>) config.get(CFG_KAFKA));
>     final Map<String, Object> zookeeperConfig = checkNotNull((Map<String,
> Object>) config.get(CFG_ZOOKEEPER));
>
>     final ZkHosts hosts = new ZkHosts((String)
> zookeeperConfig.get("address"));
>     final String CLIENT_ID = "storm-spout-" + environment;
>     final SpoutConfig spoutConfig = new SpoutConfig(hosts,
> checkNotNull((String) kafkaConfig.get("input_topic")),
>                                                     checkNotNull((String)
> zookeeperConfig.get("root")), CLIENT_ID);
>     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>
>
>     builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
>
>     builder.setBolt("processor", new ProcessorBolt("json"),
> 4).shuffleGrouping("kafka-spout");
>
>     builder.setBolt("kafka-output",
>                     new KafkaBolt<String, String>()
>                         .withTopicSelector(new
> DefaultTopicSelector(checkNotNull((String)
> kafkaConfig.get("output_topic"))))
>                         .withTupleToKafkaMapper(new
> FieldNameBasedTupleToKafkaMapper("key", "json")),
>                     1).shuffleGrouping("processor");
> }
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
>
>
> *Processing Bolt (simplified):*
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
> public class ProcessorBolt extends BaseRichBolt {
>
>     private String outputField;
>     private OutputCollector outputCollector;
>
>     protected ProcessorBolt(final String outputField) {
>         this.outputField = outputField;
>     }
>
>     public void prepare(final Map conf, final TopologyContext context,
> final OutputCollector collector) {
>         this.outputCollector = collector;
>     }
>
>     public void execute(final Tuple tuple) {
>         final String wirejson = tuple.getString(0);
>
>         // always ack, nothing to recover from if we can't parse/process
>         this.outputCollector.ack(tuple);
>
>         // parse json, process some stuff, get a list of maps back
>         // ...
>
>         for(Map<String, Object> m : maps) {
>             String json = toJson(m);
>
>             this.outputCollector.emit(tuple, new Values(json));
>         }
>     }
>
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields(this.outputField));
>     }
> }
> *------------**------------**------------**------------**------------*
> *------------**------------**------------*
>
> Let me know if you need any additional info.
>
> Thanks in advance,
> -Jason
>

Reply via email to