unsubscribe
unsubscribe
How to retrieve the offsets of messages storm-kafka
Hi all, I would to know if there is way to get the offset/partition of each message using the KafkaSpout ? Thank in advance -- Florian HUSSONNOIS
Re: storm cassandra integration
Hi Crina, You can fork the external storm-cassandra from master branch. This module should be compatible from storm version 0.9.5 to 0.11. The actual fluent API is not stable and will be deprecated so you should directly use this following class : https://github.com/apache/storm/blob/master/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java 2015-12-08 14:47 GMT+01:00 Crina Arsenie <arsenie.cr...@gmail.com>: > Hello, > > > > I was planning to integrate Cassandra with a storm project I am working > on. I am using the latest version of storm (stable 0.10). But then, I > researched and the component storm-cassandra from external composites of > storm is available only starting the master branch of github so not with > this version. > > > Therefore, I tried to search for a storm-cassandra component before and I > found the https://github.com/hmsonline/storm-cassandra > <https://github.com/hmsonline/storm-cassandra%200.3.1>(latest stable > 0.3.1). Then I had problems using this as it does not use the latest > version of storm (but old 0.8.1 one), neither of Cassandra (1.1.2 instead > of current 2.2.3), and the worst part that gave me headache before > understanding the errors I had when integrating this was that it was > compiled under java 1.6! > > So my question is if do you think I can find at this current state a > proper way to integrate Cassandra bolts ? > > > Thank you in advance for your help. > > Crina > > -- Florian HUSSONNOIS
Yet another Storm Cassandra connector
Hi all, I would like to share with you a new connector between Storm and Cassandra. This library provides fluent API for building CQL bolts. Queries can be executed and handled asynchronously by bolts (Datastax java-driver). https://github.com/fhussonnois/storm-cassandra Thank for your feedbacks. -- Florian HUSSONNOIS
Re: Field Group Hash Computation
> > > > > > Thanks Derek. I use strings and I still end up with > > some bolts > > > > having the maximum requests :( > > > > > > > > On Tue, Sep 29, 2015 at 5:03 PM, Derek Dagit > > > <der...@yahoo-inc.com <mailto:der...@yahoo-inc.com> > > <mailto:der...@yahoo-inc.com <mailto:der...@yahoo-inc.com>> > > > > <mailto:der...@yahoo-inc.com > > <mailto:der...@yahoo-inc.com> > > > <mailto:der...@yahoo-inc.com > > <mailto:der...@yahoo-inc.com>>>> wrote: > > > > > > > > The code that hashes the field values is here: > > > > > > > > > > > > > > https://github.com/apache/storm/blob/9d911ec1b4f7b5aabe646a5d2cd31591fe4df1b0/storm-core/src/clj/backtype/storm/tuple.clj#L24 > > > > > > > > > > > > You can write a little java program, something > like: > > > > > > > > public static void main(String[] args) { > > > > ArrayList myList = new > > ArrayList(); > > > > myList.add("first field value"); > > > > myList.add("second field value"); > > > > > > > > int hash = > > Arrays.deephashCode(myList.toArray()); // > > > as in > > > > tuple.clj > > > > > > > > > > > > System.out.println("hash is "+hash); > > > > int numTasks = 32; > > > > > > > > System.out.println("task index is " + hash % > > numTasks); > > > > > > > > } > > > > > > > > > > > > There are certain types of values that may not > hash > > > > consistently. If you are using String values, > > then it > > > should be > > > > fine. Other types may or may not, depending on > > how the > > > class > > > > implements hashCode(). > > > > > > > > > > > > -- > > > > Derek > > > > > > > > > > > > > > > > From: Kashyap Mhaisekar <kashya...@gmail.com > > <mailto:kashya...@gmail.com> > > > <mailto:kashya...@gmail.com <mailto:kashya...@gmail.com>> > > > > <mailto:kashya...@gmail.com > > <mailto:kashya...@gmail.com> <mailto:kashya...@gmail.com > > <mailto:kashya...@gmail.com>>>> > > > > To: user@storm.apache.org > > <mailto:user@storm.apache.org> > > > <mailto:user@storm.apache.org > > <mailto:user@storm.apache.org>> <mailto:user@storm.apache.org > > <mailto:user@storm.apache.org> > > > <mailto:user@storm.apache.org user@storm.apache.org>>> > > > > Sent: Tuesday, September 29, 2015 4:28 PM > > > > Subject: Field Group Hash Computation > > > > > > > > > > > > > > > > Hi, > > > > I have a field grouping based on 2 fields. I > have 32 > > > consumers > > > > for the tuple and I see most of the times, out > of 64 > > > bolts, the > > > > field group is always on 8 of them. Of the 8, 2 > have > > > more than > > > > 60% of the data. The data for the field grouping > can > > > have 20 > > > > different combinations. > > > > > > > > Do you know what is the way to compute the Hash > > of the > > > fields > > > > used for computing? One of the groups mails > indicate > > > that the > > > > approach is - > > > > > > > > It calls "hashCode" on the list of selected > > values and > > > mods it > > > > by the > > > > number of consumer tasks. You can play around > with > > > that function > > > > to see if > > > > something about your data is causing something > > > degenerative to > > > > happen and > > > > cause skew > > > > > > > > I saw the clojure code but not sure how to > > understand > > > this. > > > > > > > > Thanks > > > > Kashyap > > > > > > > > > > > > > > > > > > > -- Florian HUSSONNOIS
Trident - how to implement distinct() operation ?
Hi all, In Trident, what is the best way to keep only distinct tuples based on specified fields ? This operation must be apply per batch. I end up implementing an Aggregator as follows : public class Distinct extends BaseAggregator<Map<Object, List>> { private Fields fields; public Distinct(Fields fields) { this.fields = fields; } @Override public Map<Object, List> init(Object batchId, TridentCollector collector) { return new HashMap<>(); } @Override public void aggregate(Map<Object, List> state, TridentTuple tuple, TridentCollector collector) { List values = tuple.getValues(); List key = new ArrayList<>(fields.size()); key.addAll(fields.toList().stream().map(tuple::getValueByField).collect(Collectors.toList())); if( ! state.containsKey(key) ) { state.put(key, values); } } @Override public void complete(Map<Object, List> state, TridentCollector collector) { state.values().forEach(collector::emit); } } However,that implementation seem to be cumbersome because we have to declare all the input/output fields : stream.partitionBy(new Fields("type") ) .partitionAggregate(new Fields("sensor-id", "type"", "ts"), new Distinct(new new Fields("type"), new Fields("sensor-id", "type"", "ts") Another solution could be to use a Filter but is there a way to get the batch ID ? A nice feature would be to have this operation directly on the stream classe : stream.distinct(new Fields("type")) and stream.partitionDistinct(new Fields("type")). Thank you in advance. -- Florian HUSSONNOIS
Re: Workers maxing out heap, excessive GC, mk_acker_bolt-pending map is huge (0.9.5)
Hi, You should ack input tuple after emitting new ones : try { // parse json string ... // then emit } catch (Throwable t) { /*nothing to recover */ } finally { collector.ack(tuple) } Hope this will fix your issue. Le 21 août 2015 02:17, Jason Chen jason.c...@hulu.com a écrit : Hi all. Here’s what I’m seeing. I’ve got a fairly simple topology, consisting of 3 bolts. Kafka spout, simple processing bolt (JSON parse to POJO, a bit of processing, and back to JSON), and Kafka Bolt (output). 12 workers, Xmx1G. It runs happily for a little over a day, then basically slows down/stops processing altogether. Cluster is instrumented with storm-graphite (https://github.com/verisign/storm-graphite). When the topology is freshly deployed, spout complete latency averages around *5ms*, and JVM heap usage starts at around *100MB* across all workers. Very little PSMarkSweep GC activity. The workers slowly creep up in heap usage across all workers, until they start brushing up against max heap. At this point, the topology is pretty sad: spout complete latency averages *5sec*, spout lag starts to increase, spout fail counts average *300*/min, and PSMarkSweep GC averages *15+ seconds per run* (!!!) averaging ~2 GC runs/worker/minute. The JVMs are pretty much hosed at this point, and I stop seeing the topology doing much useful work. I took a heapdump via JVisualVM of one of the maxed out workers. The majority of the heap usage is dominated by the following structure: == field typeretained heap -- this acker 1,132,188,840 state Container 1,132,188,816 object acker$mk_acker_bolt$reify__803 1,132,188,792 output_collector MutableObject 184 pendingMutableObject 1,132,188,568 oRotatingMap 1,132,188,544 ... Within the acker’s pending map, eventually we get to a huge HashMap (8,388,608 items!) with Long, PersistentArrayMapKeyword, Long pairs, eg: key: 3133635607298342113, value: [ clojure.lang.Keyword #11, 7092611081953912005 ] This is probably what’s causing my workers to run out of heap. But why is each worker keeping track of so many pending (tuples?) My processing RichBolt acks immediately (since I fail-fast if I run into any parsing issues), so I can’t think of a reason why so many tuples would be pending (8 million+ ?). Any ideas what might be causing this apparent leak? I feel like we’re running a pretty stock topology. Restarting the topology every day is the only reliable way to keep my topology running, not exactly the most scalable solution :p *Here are more details about my topology (including settings that I believe I’ve changed away from the default).* * storm-core, storm-kafka *0.9.5* ** *one acker per worker, so 12 ackers. *.yaml Config:* ********** ****** topology.debug: false topology.max.spout.pending: 2500 # The maximum number of tuples that can be pending on a spout task at any given time. topology.spout.max.batch.size: 65 * 1024 topology.workers: 12 topology.worker.childopts: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=1%ID% -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/“ ********** ****** *Topology Construction:* ********** ****** public static void buildProcessingTopology(final TopologyBuilder builder, final Config config, final String environment) { final MapString, Object kafkaConfig = checkNotNull((MapString, Object) config.get(CFG_KAFKA)); final MapString, Object zookeeperConfig = checkNotNull((MapString, Object) config.get(CFG_ZOOKEEPER)); final ZkHosts hosts = new ZkHosts((String) zookeeperConfig.get(address)); final String CLIENT_ID = storm-spout- + environment; final SpoutConfig spoutConfig = new SpoutConfig(hosts, checkNotNull((String) kafkaConfig.get(input_topic)), checkNotNull((String) zookeeperConfig.get(root)), CLIENT_ID); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); builder.setSpout(kafka-spout, new KafkaSpout(spoutConfig), 1); builder.setBolt(processor, new ProcessorBolt(json), 4).shuffleGrouping(kafka-spout); builder.setBolt(kafka-output, new KafkaBoltString, String()
Re: An Apache Storm bolt receive multiple input tuples from different spout/bolt
Hi, This answer may help you : http://stackoverflow.com/questions/30386338/sent-different-tuples-from-1-spout-to-different-bolt-in-apache-storm/30396802#30396802 2015-05-23 10:40 GMT+02:00 Chun Yuen Lim limchuny...@siswa.um.edu.my: Is it possible for a bolt receive multiple input tuples from different spout/bolt? For instance, Bolt C receive input tuples from Spout A and input tuples from Bolt B to be processed. How should I implement it? I mean writing the Java code for Bolt C and also its topology. -- Florian HUSSONNOIS
Re: local cluster with external zookeeper
Hi, You can create a LocalCluster with an external zookeeper as follows : new LocalCluster(localhost, 2181) 2015-05-05 14:23 GMT+02:00 clay teahouse clayteaho...@gmail.com: Hi All, Is it possible to run a topology in local cluster mode, with an external zookeeper? I see the following jira which seems to have been submitted to allow for an external zookeeper with local cluster topology. https://issues.apache.org/jira/browse/STORM-213 thank you Clay -- Florian HUSSONNOIS Tel +33 6 26 92 82 23
How to register objects into TopologyContext ?
Hi I would like to know if it is possible (or is it a good idea) to use the executorData map from the TopologyContext for sharing objects (services) between tasks ? How to register objects into that Map before the 'prepare' methods are invoked ? Thank you for your time. -- Florian HUSSONNOIS
Re: Best way to clean-up log files in worker nodes
Hi, Have you try to edit the logback/cluster.xml file to enable logs rotation ? This file is periodically scanned so you don't need to restart your supervisors. 2015-03-03 19:39 GMT+01:00 Nick R. Katsipoulakis nick.kat...@gmail.com: Hello, I have had my cluster running for a while and my worker nodes' log files are getting really big in size. What is the best way to clear up some space? Should I just erase them and move them back them up somewhere else? Thanks, Nikos -- Nikolaos Romanos Katsipoulakis, University of Pittsburgh, PhD candidate -- Florian HUSSONNOIS
Tuples never reach bolts
Hi, Is someone already faced with tuples that never reach a bolt ? On a project, we have deployed a topology composed of 1 kafka spout and 3 bolts to a cluster. Messages are read from a kafka queue. Then, tuples are handled by a json parser bolt and re-emitted to a third bolt to be transformed. However the tuples seems to never reach the last bolt. Looking into storm UI, we have observed that 100% of our stream is redirected to the bolt but its execute method is never called. We have no error into the workers logs. Thanks you a lot for your help. -- Florian HUSSONNOIS
Re: Strange Id in Storm UI
Hi, Are you running a trident topology ? In this case, trident has its owns spout/bolt to execute your functions. *$mastercoord-bg0 is the* MasterBatchCoordinator which is a generic spout used for any trident topologies 2015-01-27 15:04 GMT+01:00 Denis DEBARBIEUX ddebarbi...@norsys.fr: Dear all, I ran a topology in local mode. Now I would like to deploy it on a cluster with one node. I use storm UI to debug it. - In local mode, the id of the spouts/bolts where human readable: it was the name of my Java classes - On the cluster, the id are very strange like *$mastercoord-bg0* or *b-1---* What did I wrong? Thanks for your help Denis -- http://www.avast.com/ L'absence de virus dans ce courrier électronique a été vérifiée par le logiciel antivirus Avast. www.avast.com -- Florian HUSSONNOIS
Re: Machine Learning in STORM
This project looks very interesting : http://samoa-project.net/ 2015-01-13 9:45 GMT+01:00 padma priya chitturi padmapriy...@gmail.com: Mahout could be easily integrated On Tue, Jan 13, 2015 at 12:47 PM, Deepak Sharma deepakmc...@gmail.com wrote: You can have a look at H2O's 0xdata. Thanks Deepak On Tue, Jan 13, 2015 at 12:33 PM, Sridhar G sridh...@microland.com wrote: Team, Which Machine Learning Libraries to be integrated in Storm. Regards, Sridhar *Believe in the impossible and remove the improbable* The information contained in this transmission may contain privileged and confidential information of Microland Limited, including information protected by privacy laws. It is intended only for the use of Microland Limited. If you are not the intended recipient, you are hereby notified that any review, dissemination, distribution, or duplication of this communication is strictly prohibited. If you are not the intended recipient, please contact the sender by reply email and destroy all copies of the original message. Although Microland has taken reasonable precautions to ensure no viruses are present in this email, Microland cannot accept responsibility for any loss or damage arising from the use of this email or attachments. Computer viruses can be transmitted via email. Recipient should check the email and any attachments for the presence of viruses before using them. Any views or opinions are solely those of the author and do not necessarily represent those of Microland. This email may be monitored. -- Thanks Deepak www.bigdatabig.com www.keosha.net -- Florian HUSSONNOIS Tel +33 6 26 92 82 23