unsubscribe

2019-08-15 Thread Florian Hussonnois
unsubscribe


How to retrieve the offsets of messages storm-kafka

2016-01-19 Thread Florian Hussonnois
Hi all,

I would to know if there is way to get the offset/partition of each message
using the
KafkaSpout ?

Thank in advance

-- 
Florian HUSSONNOIS


Re: storm cassandra integration

2015-12-08 Thread Florian Hussonnois
Hi Crina,

You can fork the external storm-cassandra from master branch. This module
should be compatible from storm version 0.9.5 to 0.11.
The actual fluent API is not stable and will be deprecated so you should
directly use this following class :

https://github.com/apache/storm/blob/master/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BaseCassandraBolt.java



2015-12-08 14:47 GMT+01:00 Crina Arsenie <arsenie.cr...@gmail.com>:

> Hello,
>
>
>
> I was planning to integrate Cassandra with a storm project I am working
> on. I am using the latest version of storm (stable 0.10). But then, I
> researched and the component storm-cassandra from external composites of
> storm is available only starting the master branch of github so not with
> this version.
>
>
> Therefore, I tried to search for a storm-cassandra component before and I
> found the https://github.com/hmsonline/storm-cassandra
> <https://github.com/hmsonline/storm-cassandra%200.3.1>(latest stable
> 0.3.1). Then I had problems using this as it does not use the latest
> version of storm (but old 0.8.1 one), neither of Cassandra (1.1.2 instead
> of current 2.2.3), and the worst part that gave me headache before
> understanding the errors I had when integrating this was that it was
> compiled under java 1.6!
>
> So my question is if do you think I can find at this current state a
> proper way to integrate Cassandra bolts ?
>
>
> Thank you in advance for your help.
>
> Crina
>
>


-- 
Florian HUSSONNOIS


Yet another Storm Cassandra connector

2015-10-27 Thread Florian Hussonnois
Hi all,

I would like to share with you a new connector between Storm and Cassandra.
This library provides fluent API for building CQL bolts. Queries can be
executed and handled asynchronously by bolts (Datastax java-driver).

https://github.com/fhussonnois/storm-cassandra

Thank for your feedbacks.

-- 
Florian HUSSONNOIS


Re: Field Group Hash Computation

2015-10-06 Thread Florian Hussonnois
   > >
> > > > Thanks Derek. I use strings and I still end up with
> > some bolts
> > > > having the maximum requests :(
> > > >
> > > > On Tue, Sep 29, 2015 at 5:03 PM, Derek Dagit
> > > <der...@yahoo-inc.com <mailto:der...@yahoo-inc.com>
> > <mailto:der...@yahoo-inc.com <mailto:der...@yahoo-inc.com>>
> > > > <mailto:der...@yahoo-inc.com
> > <mailto:der...@yahoo-inc.com>
> > > <mailto:der...@yahoo-inc.com
> > <mailto:der...@yahoo-inc.com>>>> wrote:
> > > >
> > > > The code that hashes the field values is here:
> > > >
> > > >
> > >
> >
> https://github.com/apache/storm/blob/9d911ec1b4f7b5aabe646a5d2cd31591fe4df1b0/storm-core/src/clj/backtype/storm/tuple.clj#L24
> > > >
> > > >
> > > > You can write a little java program, something
> like:
> > > >
> > > > public static void main(String[] args) {
> > > >   ArrayList myList = new
> > ArrayList();
> > > >  myList.add("first field value");
> > > >   myList.add("second field value");
> > > >
> > > >   int hash =
> > Arrays.deephashCode(myList.toArray()); //
> > > as in
> > > > tuple.clj
> > > >
> > > >
> > > >   System.out.println("hash is "+hash);
> > > >   int numTasks = 32;
> > > >
> > > >   System.out.println("task index is " + hash %
> > numTasks);
> > > >
> > > > }
> > > >
> > > >
> > > > There are certain types of values that may not
> hash
> > > > consistently.  If you are using String values,
> > then it
> > > should be
> > > > fine. Other types may or may not, depending on
> > how the
> > > class
> > > > implements hashCode().
> > > >
> > > >
> > > > --
> > > > Derek
> > > >
> > > >
> > > > 
> > > > From: Kashyap Mhaisekar <kashya...@gmail.com
> > <mailto:kashya...@gmail.com>
> > > <mailto:kashya...@gmail.com <mailto:kashya...@gmail.com>>
> > > > <mailto:kashya...@gmail.com
> > <mailto:kashya...@gmail.com> <mailto:kashya...@gmail.com
> > <mailto:kashya...@gmail.com>>>>
> > > > To: user@storm.apache.org
> > <mailto:user@storm.apache.org>
> > > <mailto:user@storm.apache.org
> > <mailto:user@storm.apache.org>> <mailto:user@storm.apache.org
> > <mailto:user@storm.apache.org>
> > > <mailto:user@storm.apache.org  user@storm.apache.org>>>
> > > > Sent: Tuesday, September 29, 2015 4:28 PM
> > > > Subject: Field Group Hash Computation
> > > >
> > > >
> > > >
> > > > Hi,
> > > > I have a field grouping based on 2 fields. I
> have 32
> > > consumers
> > > > for the tuple and I see most of the times, out
> of 64
> > > bolts, the
> > > > field group is always on 8 of them. Of the 8, 2
> have
> > > more than
> > > > 60% of the data. The data for the field grouping
> can
> > > have 20
> > > > different combinations.
> > > >
> > > > Do you know what is the way to compute the Hash
> > of the
> > > fields
> > > > used for computing? One of the groups mails
> indicate
> > > that the
> > > > approach is -
> > > >
> > > > It calls "hashCode" on the list of selected
> > values and
> > > mods it
> > > > by the
> > > > number of consumer tasks. You can play around
> with
> > > that function
> > > > to see if
> > > > something about your data is causing something
> > > degenerative to
> > > > happen and
> > > > cause skew
> > > >
> > > > I saw the clojure code but not sure how to
> > understand
> > > this.
> > > >
> > > > Thanks
> > > > Kashyap
> > > >
> > > >
> > > >
> > >
> >
>
>


-- 
Florian HUSSONNOIS


Trident - how to implement distinct() operation ?

2015-09-17 Thread Florian Hussonnois
Hi all,

In Trident, what is the best way to keep only distinct tuples based on
specified fields ? This operation must be apply per batch.

I end up implementing an Aggregator as follows :

public class Distinct extends BaseAggregator<Map<Object, List>>
{
private Fields fields;

public Distinct(Fields fields) {
this.fields = fields;
}

@Override
public Map<Object, List> init(Object batchId,
TridentCollector collector) {
return new HashMap<>();
}

@Override
public void aggregate(Map<Object, List> state, TridentTuple
tuple, TridentCollector collector) {
List values = tuple.getValues();
List key = new ArrayList<>(fields.size());

key.addAll(fields.toList().stream().map(tuple::getValueByField).collect(Collectors.toList()));
if( ! state.containsKey(key) ) {
state.put(key, values);
}
}

@Override
public void complete(Map<Object, List> state,
TridentCollector collector) {
state.values().forEach(collector::emit);
}
}


However,that implementation seem to be cumbersome because we have to
declare all the input/output fields :

stream.partitionBy(new Fields("type") )
   .partitionAggregate(new Fields("sensor-id", "type"", "ts"), new
Distinct(new new Fields("type"), new Fields("sensor-id", "type"", "ts")

Another solution could be to use a Filter but is there a way to get the
batch ID ?

A nice feature would be to have this operation directly on the stream
classe :

stream.distinct(new Fields("type")) and stream.partitionDistinct(new
Fields("type")).

Thank you in advance.

-- 
Florian HUSSONNOIS


Re: Workers maxing out heap, excessive GC, mk_acker_bolt-pending map is huge (0.9.5)

2015-08-21 Thread Florian Hussonnois
Hi,

You should ack input tuple after emitting new ones :

try {
// parse json string
...
// then emit
} catch (Throwable t) {
/*nothing to recover */
} finally {
collector.ack(tuple)
}

Hope this will fix your issue.
Le 21 août 2015 02:17, Jason Chen jason.c...@hulu.com a écrit :

 Hi all.

 Here’s what I’m seeing.  I’ve got a fairly simple topology, consisting of
 3 bolts.  Kafka spout, simple processing bolt (JSON parse to POJO, a bit of
 processing, and back to JSON), and Kafka Bolt (output).  12 workers,
 Xmx1G.  It runs happily for a little over a day, then basically slows
 down/stops processing altogether.  Cluster is instrumented with
 storm-graphite (https://github.com/verisign/storm-graphite).  When the
 topology is freshly deployed, spout complete latency averages around *5ms*,
 and JVM heap usage starts at around *100MB* across all workers.  Very
 little PSMarkSweep GC activity.  The workers slowly creep up in heap usage
 across all workers, until they start brushing up against max heap.  At this
 point, the topology is pretty sad: spout complete latency averages *5sec*,
 spout lag starts to increase, spout fail counts average *300*/min, and
 PSMarkSweep GC averages *15+ seconds per run* (!!!) averaging ~2 GC
 runs/worker/minute.  The JVMs are pretty much hosed at this point, and I
 stop seeing the topology doing much useful work.

 I took a heapdump via JVisualVM of one of the maxed out workers.  The
 majority of the heap usage is dominated by the following structure:


 ==
 field  typeretained heap

 --
 this   acker   1,132,188,840
 state  Container   1,132,188,816
 object acker$mk_acker_bolt$reify__803  1,132,188,792
 output_collector   MutableObject   184
 pendingMutableObject   1,132,188,568
   oRotatingMap 1,132,188,544
 ...

 Within the acker’s pending map, eventually we get to a huge HashMap
 (8,388,608 items!) with Long, PersistentArrayMapKeyword, Long pairs,
 eg:
 key: 3133635607298342113, value: [ clojure.lang.Keyword
 #11, 7092611081953912005 ]

 This is probably what’s causing my workers to run out of heap.  But why is
 each worker keeping track of so many pending (tuples?)  My processing
 RichBolt acks immediately (since I fail-fast if I run into any parsing
 issues), so I can’t think of a reason why so many tuples would be pending
 (8 million+ ?).

 Any ideas what might be causing this apparent leak?  I feel like we’re
 running a pretty stock topology.  Restarting the topology every day is the
 only reliable way to keep my topology running, not exactly the most
 scalable solution :p

 *Here are more details about my topology (including settings that I
 believe I’ve changed away from the default).*
 * storm-core, storm-kafka *0.9.5*
 ** *one acker per worker, so 12 ackers.

 *.yaml Config:*
 **********
 ******
 topology.debug: false
 topology.max.spout.pending: 2500 # The maximum number of tuples that can
 be pending on a spout task at any given time.
 topology.spout.max.batch.size: 65 * 1024
 topology.workers: 12

 topology.worker.childopts: -Dcom.sun.management.jmxremote
 -Dcom.sun.management.jmxremote.ssl=false
 -Dcom.sun.management.jmxremote.authenticate=false
 -Dcom.sun.management.jmxremote.port=1%ID% -XX:+HeapDumpOnOutOfMemoryError
 -XX:HeapDumpPath=/tmp/“
 **********
 ******

 *Topology Construction:*
 **********
 ******
 public static void buildProcessingTopology(final TopologyBuilder builder,
 final Config config, final String environment) {
 final MapString, Object kafkaConfig = checkNotNull((MapString,
 Object) config.get(CFG_KAFKA));
 final MapString, Object zookeeperConfig = checkNotNull((MapString,
 Object) config.get(CFG_ZOOKEEPER));

 final ZkHosts hosts = new ZkHosts((String)
 zookeeperConfig.get(address));
 final String CLIENT_ID = storm-spout- + environment;
 final SpoutConfig spoutConfig = new SpoutConfig(hosts,
 checkNotNull((String) kafkaConfig.get(input_topic)),
 checkNotNull((String)
 zookeeperConfig.get(root)), CLIENT_ID);
 spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());


 builder.setSpout(kafka-spout, new KafkaSpout(spoutConfig), 1);

 builder.setBolt(processor, new ProcessorBolt(json),
 4).shuffleGrouping(kafka-spout);

 builder.setBolt(kafka-output,
 new KafkaBoltString, String()

Re: An Apache Storm bolt receive multiple input tuples from different spout/bolt

2015-05-24 Thread Florian Hussonnois
Hi, This answer may help you :
http://stackoverflow.com/questions/30386338/sent-different-tuples-from-1-spout-to-different-bolt-in-apache-storm/30396802#30396802

2015-05-23 10:40 GMT+02:00 Chun Yuen Lim limchuny...@siswa.um.edu.my:

 Is it possible for a bolt receive multiple input tuples from different
 spout/bolt?
 For instance, Bolt C receive input tuples from Spout A and input tuples
 from Bolt B to be processed. How should I implement it? I mean writing the
 Java code for Bolt C and also its topology.




-- 
Florian HUSSONNOIS


Re: local cluster with external zookeeper

2015-05-05 Thread Florian Hussonnois
Hi,

You can create a LocalCluster with an external zookeeper as follows :  new
LocalCluster(localhost, 2181)

2015-05-05 14:23 GMT+02:00 clay teahouse clayteaho...@gmail.com:

 Hi All,

 Is it possible to run a topology in local cluster mode, with an external
 zookeeper? I see the following jira which seems to have been submitted to
 allow for an external zookeeper with local cluster topology.

 https://issues.apache.org/jira/browse/STORM-213

 thank you
 Clay






-- 
Florian HUSSONNOIS
Tel +33 6 26 92 82 23


How to register objects into TopologyContext ?

2015-03-06 Thread Florian Hussonnois
Hi

I would like to know if it is possible (or is it a good idea) to use the
executorData map from the TopologyContext for sharing objects (services)
between tasks ?

How to register objects into that Map before the 'prepare' methods are
invoked ?

Thank you for your time.

-- 
Florian HUSSONNOIS


Re: Best way to clean-up log files in worker nodes

2015-03-03 Thread Florian Hussonnois
Hi,

Have you try to edit the logback/cluster.xml file to enable logs rotation ?
This file is periodically scanned so you don't need to restart your
supervisors.

2015-03-03 19:39 GMT+01:00 Nick R. Katsipoulakis nick.kat...@gmail.com:

 Hello,

 I have had my cluster running for a while and my worker nodes' log files
 are getting really big in size. What is the best way to clear up some
 space? Should I just erase them and move them back them up somewhere else?

 Thanks,
 Nikos
 --
 Nikolaos Romanos Katsipoulakis,
 University of Pittsburgh, PhD candidate




-- 
Florian HUSSONNOIS


Tuples never reach bolts

2015-02-26 Thread Florian Hussonnois
Hi,

Is someone already faced with tuples that never reach a bolt ?

On a project, we have deployed a topology composed of 1 kafka spout and 3
bolts to a cluster.

Messages are read from a kafka queue. Then, tuples are handled by a json
parser bolt and re-emitted to a third bolt to be transformed. However the
tuples seems to never reach the last bolt.
Looking into storm UI, we have observed that 100% of our stream is
redirected to the bolt but its execute method is never called.

We have no error into the workers logs.

Thanks you a lot for your help.

-- 
Florian HUSSONNOIS


Re: Strange Id in Storm UI

2015-01-27 Thread Florian Hussonnois
Hi,

Are you running a trident topology ? In this case, trident has its owns
spout/bolt to execute your functions. *$mastercoord-bg0 is the*
MasterBatchCoordinator
which is a generic spout used for any trident topologies




2015-01-27 15:04 GMT+01:00 Denis DEBARBIEUX ddebarbi...@norsys.fr:

  Dear all,

 I ran a topology in local mode. Now I would like to deploy it on a cluster
 with one node. I use storm UI to debug it.
  - In local mode, the id of the spouts/bolts where human readable: it was
 the name of my Java classes
 - On the cluster, the id are very strange like *$mastercoord-bg0* or
 *b-1---*

 What did I wrong?

 Thanks for your help

 Denis


 --
http://www.avast.com/

 L'absence de virus dans ce courrier électronique a été vérifiée par le
 logiciel antivirus Avast.
 www.avast.com




-- 
Florian HUSSONNOIS


Re: Machine Learning in STORM

2015-01-13 Thread Florian Hussonnois
This project looks very interesting : http://samoa-project.net/

2015-01-13 9:45 GMT+01:00 padma priya chitturi padmapriy...@gmail.com:

 Mahout could be easily integrated

 On Tue, Jan 13, 2015 at 12:47 PM, Deepak Sharma deepakmc...@gmail.com
 wrote:

 You can have a look at H2O's 0xdata.

 Thanks
 Deepak

 On Tue, Jan 13, 2015 at 12:33 PM, Sridhar G sridh...@microland.com
 wrote:

 Team,



 Which Machine Learning Libraries to be integrated in Storm.



 Regards,

 Sridhar

 *Believe in the impossible and remove the improbable*



 The information contained in this transmission may contain privileged
 and confidential information of Microland Limited, including information
 protected by privacy laws. It is intended only for the use of Microland
 Limited. If you are not the intended recipient, you are hereby notified
 that any review, dissemination, distribution, or duplication of this
 communication is strictly prohibited. If you are not the intended
 recipient, please contact the sender by reply email and destroy all copies
 of the original message. Although Microland has taken reasonable
 precautions to ensure no viruses are present in this email, Microland
 cannot accept responsibility for any loss or damage arising from the use of
 this email or attachments. Computer viruses can be transmitted via email.
 Recipient should check the email and any attachments for the presence of
 viruses before using them. Any views or opinions are solely those of the
 author and do not necessarily represent those of Microland.


 This email may be monitored.




 --
 Thanks
 Deepak
 www.bigdatabig.com
 www.keosha.net





-- 
Florian HUSSONNOIS
Tel +33 6 26 92 82 23