I have also run into this issue more recently. I have a topology that runs fine for ~12-48 hours and then all of a sudden the complete() method in my BaseAggregator stops getting called entirely. Looking for some guidance:
- Under which conditions would this happen? - I am using an OpaqueTridentKafkaSpout ( https://github.com/wurstmeister/storm-kafka-0.8-plus/blob/master/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java). Would a property of opaque spouts maybe cause this? - Is there anything beyond what is contained in https://github.com/nathanmarz/storm/wiki/Trident-state#persistentaggregate, https://github.com/nathanmarz/storm/wiki/Trident-spouts and https://github.com/nathanmarz/storm/wiki/Trident-API-Overview that I should know when using aggregators in Trident? On Fri, Jul 26, 2013 at 9:03 AM, Laurent <[email protected]> wrote: > I'm having the same problem here. > Actually, it works fine in LocalCluster mode but it doesn't work when i > deploy it using StormSubmitter. > Well, that's not totally right, it works the first time it's executed but > then stops passing through the complete. > > I noticed in init that the batch id evolves like that : > 1:0 > 2:0 > 2:1 > 2:2 > 2:3 > 2:4 > ... > 2:n > > So i see there's something wrong but can't find what. > Have you noticed the same behavior on your side ? > Have you had any chance finding where you problem came from ? > > I've tried switching to a CombinnerAggregator : same issue. > > Regards > Laurent > > > On Thursday, January 17, 2013 4:57:46 PM UTC+1, Paul Bogen wrote: >> >> In my TridentTopology I have a portion where I split a string into words >> and counts. I then group the tuples by the path of the file they come from >> and finally I aggregate the groups into a List using a >> custom BaseAggregator. >> >> Relevant section of topology: >> >> stream = stream.each(new Fields("content"), new VectorizeFunction(), >> newFields >> ("term", "count")).parallelismHint(8); >> GroupedStream gstream = stream.groupBy(new Fields("path")); >> Stream = gstream.aggregate(new Fields("term", "weight"), newVectorAggregator >> (), new Fields("vector")).parallelismHint(8); >> >> The problem is the complete method is never called to emit the list. >> >> Here is the aggregator: >> >> import java.util.concurrent.ConcurrentHashMap; >> import java.util.concurrent.ConcurrentMap; >> import backtype.storm.tuple.Values; >> >> import storm.trident.operation.BaseAggregator; >> import storm.trident.operation.TridentCollector; >> import storm.trident.tuple.TridentTuple; >> >> public class VectorAggregator extends BaseAggregator<ConcurrentMap<String, >> Double>> { >> >> public ConcurrentMap<String, Double> init(Object batchId, >> TridentCollector collector) { >> return new ConcurrentHashMap<>(); >> } >> >> public void aggregate(ConcurrentMap<String, Double> val, >> TridentTuple tuple, TridentCollector collector) { >> val.put(tuple.getStringByField("term"), tuple.getDoubleByField(" >> weight")); >> } >> >> public void complete(ConcurrentMap<String, Double> val, >> TridentCollector collector) { >> collector.emit(new Values(val)); >> } >> } >> >> What am I doing wrong? >> >> plb >> >> -- > You received this message because you are subscribed to the Google Groups > "storm-user" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > For more options, visit https://groups.google.com/groups/opt_out. > > >
