unsubscribe

2019-03-31 Thread Ambud Sharma
unsubscribe


Re: Apache Storm - help in investigating cause of failures in ~20% of total events

2017-10-24 Thread Ambud Sharma
Without anchoring at least once semantics is not honored, i.e. if event is
lost Kafka spout doesn't replay it.

On Oct 1, 2017 6:12 AM, "Yovav Waichman" 
wrote:

> Hi,
>
>
>
> We are using Apache Storm for a couple of years, and everything was fine
> till now.
>
> For our spout we are using “storm-kafka-0.9.4.jar”.
>
>
>
> Lately, we started seeing that our “Failed” number of events has increased
> dramatically, and currently almost 20% of our total events are marked as
> Failed.
>
>
>
> We tried investigating our Topology logs, but we came up empty handed.
> Also checking our DB logs didn’t give us any clue as for heavy load on our
> system.
>
> Moreover, our spout complete latency is 25.996 ms, which overruled any
> timeouts that might occur.
>
>
>
> Lowering our max pending value has produced a negative result.
>
> At some point, since we are not using anchoring, we thought about adding
> anchoring, but we saw that the KafkaSpout handles failures by replaying
> them, so we were not sure whether to add it or not.
>
>
>
> It would be helpful if you can direct us as to where we can find in Storm
> logs the reason for these failures, if there’s an exception which is not
> caught, maybe a time out, since we are a bit blind at the moment.
>
>
>
> We would appreciate any help with that.
>
>
>
> Thanks in advance,
>
> Yovav
>


Re: Is Java8 supported by Storm?

2017-07-16 Thread Ambud Sharma
Java 8 and Storm 1.x work just fine. I have used Java 8 and 0.10 is
production as well.

On Mon, Jul 10, 2017 at 7:06 AM, Bobby Evans  wrote:

> I filed https://issues.apache.org/jira/browse/STORM-2620 to update the
> docs and put up some pull requests to clarify things for the 1.x and 2.x
> branches (linked from the JIRA).
>
>
>
>
> - Bobby
>
>
>
> On Monday, July 10, 2017, 8:28:38 AM CDT, Bobby Evans 
> wrote:
>
>
> The integration tests for 0.10 versions of storm were run against java 6.
> The 1.x versions of storm are run against both java 7 and java 8.  The 2.x
> versions of storm are currently only run against java 8, but we expect to
> add in java 9 once that travis ci supports it.
>
> Internally my team runs storm only on java8 so it should work.   I will
> file a JIRA to update the docs to indicate what it correct.
>
>
> - Bobby
>
>
>
> On Monday, July 10, 2017, 8:18:49 AM CDT, Xin Wang 
> wrote:
>
>
> I'm not sure the release time of Storm 2.0, the first meta version maybe
> several weeks later. Storm 1.1.1 or 1.2 will be released ASAP.
>
> -Xin
>
> 2017-07-10 20:09 GMT+08:00 Hector Garzón 
> :
>
> Ok, thanks for the info.
> When is it planned to be released Storm v2.0? We need to decide if migrate
> all our code to java7 or wait to Storm 2.0 release.
>
> Thanks for your help.
>
> 2017-07-10 13:38 GMT+02:00 Xin Wang :
>
> Hi Hector,
>
> The Storm 2.0 needs JDK 1.8. Seems like the documents should be updated.
> And you can write a bolt using Java8 Lambda. Example:
> https://github.com/apache/stor m/blob/master/examples/storm-
> starter/src/jvm/org/apache/ storm/starter/LambdaTopology. java
> 
>
> Thanks,
> Xin
>
> 2017-07-10 19:13 GMT+08:00 Hector Garzón  >:
>
> Hello,
> As indicated on official documentation (http://storm.apache.org/relea
> ses/2.0.0-SNAPSHOT/Setting-up- a-Storm-cluster.html
> )
> the only java version tested with Storm is Java 7. And there is no guaranty
> of Storm working on another java version.
>
> So, my question is if there is any plan to support java8 (I can't see any
> project roadmap)?
>
> Has anybody already used Java8 on cluster mode on Production environment?
>
> Thanks
>
>
>
>
>


Re: Decreasing value of Complete Latency in Storm UI

2017-07-16 Thread Ambud Sharma
If I may add, it is also explained by the potential surge of tuples when
topology starts which will eventually reach an equilibrium which the normal
latency of your topology components.

On Jul 14, 2017 4:29 AM, "preethini v"  wrote:

> Hi,
>
> I am running WordCountTopology with 3 worker nodes. The parallelism of
> spout, split and count is 5, 8 and 12 respectively. I have enabled acking
> to measure the complete latency of the topology.
>
> I am considering  complete latency as a measure of end-to-end latency.
>
> The Complete latency is the time a tuple is emitted by a Spout until
> Spout.ack() is called.  Thus, it is the time from tuple being emitted,
> the tuple processing time, the time it spends in the internal input/output
> buffers and until the ack for the tuple is received by the Spout.
>
> The stats from storm UI show that the complete latency for a topology
> keeps decreasing with time.
>
> 1. Is this normal?
> 2. If yes, What explains the continuous decreasing complete latency value?
> 3. Is complete latency a good measure of end-to-end latency of a topology?
>
> Thanks,
> Preethini
>


Re: Storm topology freezes and does not process tuples from Kafka

2017-07-16 Thread Ambud Sharma
Please check if you have orphan workers. Orphan workers happen when a
topology is redeployed in a short period of time and the old workers
haven't yet been cleaned up.

Check this running ps aux|grep java or specific jar keyword if you have one.

On Jul 14, 2017 11:17 PM, "Sreeram"  wrote:

> Thank you Taylor for replying.
>
> I checked the worker logs and there were no messages that get printed
> once the worker goes into freeze( I had the worker logs set at ERROR
> level ).
>
> Regarding the components, I have kafka spout in the topology and bolts
> that write to HBase.
>
> I had been rebalancing the topology with wait time of 30seconds. Is it
> recommended to match it with topology.message.timeout.secs ?
>
> Please let me know if you need any specific info.
>
> Thanks,
> Sreeram
>
> On Sat, Jul 15, 2017 at 12:39 AM, P. Taylor Goetz 
> wrote:
> >> Supervisor log at the time of freeze looks like below
> >>
> >> 2017-07-12 14:38:46.712 o.a.s.d.supervisor [INFO]
> >> d8958816-5bc8-449e-94e3-87ddbb2c3d02 still hasn't started
> >
> >
> > There are two situations where you would see those messages: When a
> topology is first deployed, and when a worker has died and is being
> restarted.
> >
> > I suspect the latter. Have you looked at the worker logs for any
> indication that the workers might be crashing and what might be causing it?
> >
> > What components are involved in you’re topology?
> >
> > -Taylor
> >
> >
> >> On Jul 12, 2017, at 5:26 AM, Sreeram  wrote:
> >>
> >> Hi,
> >>
> >> I am observing that my storm topology intermediately freezes and does
> >> not continue to process tuples from Kafka. This happens frequently and
> >> when it happens this freeze lasts for 5 to 15 minutes. No content is
> >> written to any of the worker log files during this time.
> >>
> >> The version of storm I use is 1.0.2 and Kafka version is 0.9.0.
> >>
> >> Any suggestions to solve the issue ?
> >>
> >> Thanks,
> >> Sreeram
> >>
> >> Supervisor log at the time of freeze looks like below
> >>
> >> 2017-07-12 14:38:46.712 o.a.s.d.supervisor [INFO]
> >> d8958816-5bc8-449e-94e3-87ddbb2c3d02 still hasn't started
> >> 2017-07-12 14:38:47.212 o.a.s.d.supervisor [INFO]
> >> d8958816-5bc8-449e-94e3-87ddbb2c3d02 still hasn't started
> >> 2017-07-12 14:38:47.712 o.a.s.d.supervisor [INFO]
> >> d8958816-5bc8-449e-94e3-87ddbb2c3d02 still hasn't started
> >> 2017-07-12 14:38:48.213 o.a.s.d.supervisor [INFO]
> >> d8958816-5bc8-449e-94e3-87ddbb2c3d02 still hasn't started
> >> 2017-07-12 14:38:48.713 o.a.s.d.supervisor [INFO]
> >> d8958816-5bc8-449e-94e3-87ddbb2c3d02 still hasn't started
> >> 2017-07-12 14:38:49.213 o.a.s.d.supervisor [INFO]
> >> d8958816-5bc8-449e-94e3-87ddbb2c3d02 still hasn't started
> >> 2017-07-12 14:38:49.713 o.a.s.d.supervisor [INFO]
> >> d8958816-5bc8-449e-94e3-87ddbb2c3d02 still hasn't started
> >> 2017-07-12 14:38:50.214 o.a.s.d.supervisor [INFO]
> >> d8958816-5bc8-449e-94e3-87ddbb2c3d02 still hasn't started
> >> 2017-07-12 14:38:50.714 o.a.s.d.supervisor [INFO]
> >> d8958816-5bc8-449e-94e3-87ddbb2c3d02 still hasn't started
> >>
> >>
> >> Thread stacks (sample)
> >> Most of worker threads during this freeze period look like one of the
> >> below two stack traces.
> >>
> >> Thread 104773: (state = BLOCKED)
> >> - sun.misc.Unsafe.park(boolean, long) @bci=0 (Compiled frame;
> >> information may be imprecise)
> >> - java.util.concurrent.locks.LockSupport.parkNanos(java.lang.Object,
> >> long) @bci=20, line=215 (Compiled frame)
> >> - java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(
> java.util.concurrent.SynchronousQueue$TransferStack$SNode,
> >> boolean, long) @bci=160, line=460 (Compil
> >> ed frame)
> >> - java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.
> lang.Object,
> >> boolean, long) @bci=102, line=362 (Compiled frame)
> >> - java.util.concurrent.SynchronousQueue.poll(long,
> >> java.util.concurrent.TimeUnit) @bci=11, line=941 (Compiled frame)
> >> - java.util.concurrent.ThreadPoolExecutor.getTask() @bci=134,
> >> line=1066 (Compiled frame)
> >> - java.util.concurrent.ThreadPoolExecutor.runWorker(
> java.util.concurrent.ThreadPoolExecutor$Worker)
> >> @bci=26, line=1127 (Compiled frame)
> >> - java.util.concurrent.ThreadPoolExecutor$Worker.run() @bci=5,
> >> line=617 (Compiled frame)
> >> - java.lang.Thread.run() @bci=11, line=745 (Compiled frame)
> >>
> >> Thread 147495: (state = IN_NATIVE)
> >> - sun.nio.ch.EPollArrayWrapper.epollWait(long, int, long, int) @bci=0
> >> (Compiled frame; information may be imprecise)
> >> - sun.nio.ch.EPollArrayWrapper.poll(long) @bci=18, line=269 (Compiled
> frame)
> >> - sun.nio.ch.EPollSelectorImpl.doSelect(long) @bci=28, line=93
> (Compiled frame)
> >> - sun.nio.ch.SelectorImpl.lockAndDoSelect(long) @bci=37, line=86
> >> (Compiled frame)
> >> - sun.nio.ch.SelectorImpl.select(long) @bci=30, line=97 (Compiled
> frame)
> >> - org.apache.kafka.common.network.Selector.select(long) 

Re: Architecture Bolt Design Questions

2017-03-27 Thread Ambud Sharma
Please have a look at Storm DRPC for this to see if that would help make
things easier for you.

On Mar 16, 2017 6:45 AM, "Steve Robert"  wrote:

Hi guys ,
i have an simple question with an simple scenario
 is  it   a good practice to use the bolts for this connects to an external
data source ?

Here is a concrete example:
I would connect to external API to retrieve data  for  multiple user
I have a first  Cassandra data source  which stores credential users for
each user.
I created a spout for read database and  provide a stream of this accounts

To parallelize the downloads of this user's data , I wanted to create a
Bolt with parallelism with *fieldsGrouping* strategy for avoid the process
of the same message on the same time

The Bolt could therefore consume the credential  metadata to connecte  and
get data from the  the external API.
Do you think this mechanism is acceptable ?

I know I could use a first Spout to read credential users and send it  to
Kafka
and create new Spout for read kafka , this spout could then connect to two
data sources , the first Kafka for the crendential and the external API to
have the users's datas

But I find this mechanism cumbersome for a simple scenario

Thank for your help !

-- 
Steve Robert 
Software Engineer
srob...@qualys.com
T
Qualys, Inc. – Continuous Security
Blog  | Community  |
Twitter 



Re: Storm Kafka offset monitoring

2017-02-24 Thread Ambud Sharma
https://github.com/srotya/kafka-monitoring-tool

You can use this or the original project as a standalone monitoring tool
for offsets.

On Thu, Feb 23, 2017 at 10:52 AM, pradeep s 
wrote:

> Hi Priyank,
> The confusion is on whats the proper implementation of spout .
>
> *Method 1*
> *=*
> BrokerHosts hosts = new ZkHosts(zkConnString);
> SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" +
> topicName, spoutConfigId);
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> This is working fine . Only disadvanatage is that offsets are maintained
> in zookeeper . As per latest kafka docs , it says offsets can be maintained
> in Kafka consumerOffsets topic.
>
> Tried with method 2 for maintaining offsets in Kafka.
> *Method 2*
> *===*
>  KafkaSpoutConfig kafkaSpoutConfig = newKafkaSpoutConfig();
>
> KafkaSpout spout = new
> KafkaSpout<>(kafkaSpoutConfig);
>
> private static KafkaSpoutConfig newKafkaSpoutConfig() {
>
> Map props = new HashMap<>();
> props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS,
> bootstrapServers);
> props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID);
> props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
> "org.apache.kafka.common.serialization.
> StringDeserializer");
> props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
> "org.apache.kafka.common.serialization.
> StringDeserializer");
> props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "false");
>
>
> String[] topics = new String[1];
> topics[0] = topicName;
>
> KafkaSpoutStreams kafkaSpoutStreams =
> new KafkaSpoutStreams.Builder(new Fields("message"), new
> String[] { topicName }).build();
> new KafkaSpoutStreamsNamedTopics.Builder(new
> Fields("message"), topics).build();
>
> KafkaSpoutTuplesBuilder tuplesBuilder =
> new KafkaSpoutTuplesBuilder.Builder<>(new
> TuplesBuilder(topicName)).build();
> new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new
> TuplesBuilder(topicName)).build();
>
> KafkaSpoutConfig spoutConf =
> new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams,
> tuplesBuilder).build();
>
> return spoutConf;
> }
>
>
> But this method was showing spout errors as attached in logs.
>
> *Is it ok to maintain offsets in zookeeper *. Any disadvantages with this
> approach. Please suggest.
>
> Logs attached .
>
> Regards
> Pradeep S
>
>
>
> On Wed, Feb 22, 2017 at 5:25 PM, Priyank Shah 
> wrote:
>
>> Hi Pradeep,
>>
>>
>>
>> Both the spouts are correct and good to use. However, we should use the
>> newer spout. The one that is uses broker hosts. Reason is it uses latest
>> kafka consumer api. When do you get commit failed exception? Can you send
>> your worker logs where the spout is running? If you don’t see anything In
>> the logs try changing the log level from ui.
>>
>>
>>
>> *From: *pradeep s 
>> *Reply-To: *"user@storm.apache.org" 
>> *Date: *Tuesday, February 21, 2017 at 5:14 PM
>>
>> *To: *"user@storm.apache.org" 
>> *Subject: *Re: Storm Kafka offset monitoring
>>
>>
>>
>> Hi Priyank
>>
>> Currently I tested the spout using zookeeper broker hosts .This is
>> processing fine.But I saw another way of initialising spout using Kafka
>> bootstrap severs using kafkaspoutconfig class.
>>
>>
>>
>> https://storm.apache.org/releases/1.0.2/javadocs/org/apache/
>> storm/kafka/spout/KafkaSpoutConfig.Builder.html
>>
>>
>>
>>
>>
>> But implementing this way I was getting commit failed exception due to
>> rebalance
>>
>> Can you point out what's the proper way for implementing Kafka spout.
>>
>> In storm 1.0.3 docs I have seen the way using zookeeper broker hosts
>>
>> Regards
>>
>> Pradeep S
>>
>> On Tue, Feb 21, 2017 at 2:35 PM Priyank Shah 
>> wrote:
>>
>> Hi Pradeep,
>>
>>
>>
>> A release vote for RC1 of 1.1.0 is in progress. You can track that and
>> once it gets released you can upgrade.
>>
>>
>>
>> Regarding upgrading your spout, you don’t need to make any code changes.
>> You just need to use the latest released spout code. Usually it involves
>> updating a pom file that has a dependency on storm-kafka module to latest
>> version.
>>
>>
>>
>> *From: *pradeep s 
>> *Reply-To: *"user@storm.apache.org" 
>> *Date: *Tuesday, February 21, 2017 at 12:49 PM
>> *To: *"user@storm.apache.org" 
>> *Subject: *Re: Storm Kafka offset monitoring
>>
>>
>>
>> Hi Priyank
>>
>> Thanks for your reply.i was not able to find 1.1.0 version of storm
>>
>> Can you please point to 

Re: Storm Flux Viewer

2017-02-04 Thread Ambud Sharma
Sure, I will open a pull request and ticket.

On Feb 2, 2017 9:45 PM, "Jungtaek Lim" <kabh...@gmail.com> wrote:

> Looks great! Thanks for sharing.
>
> I think it's worth to include to the submodule of flux or one of page in
> UI, only If you are open to contribute your work to Storm project.
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2017년 2월 3일 (금) 오후 12:03, Xin Wang <data.xinw...@gmail.com>님이 작성:
>
>> Hi Ambud,
>>
>> Thanks for your nice work. I tested it. Looks good. This can be a useful
>> tool for flux users.
>>
>> - Xin
>>
>> 2017-02-03 5:08 GMT+08:00 Ambud Sharma <asharma52...@gmail.com>:
>>
>> Put together a simple webpage to visualize Flux YAML files to help
>> troubleshooting and development of Flux based topologies.
>>
>> https://github.com/ambud/flux-viewer
>>
>>
>>


Storm Flux Viewer

2017-02-02 Thread Ambud Sharma
Put together a simple webpage to visualize Flux YAML files to help
troubleshooting and development of Flux based topologies.

https://github.com/ambud/flux-viewer


Re: Why aren't storm topology send and receive buffers equal sized?

2017-02-01 Thread Ambud Sharma
Arraylist is a resizable data structure, it will resize when more tuples
come in.

More importantly, this is simply a disruptor optimization step to avoid
calling next sequence for each tuple. Rather it will call it once for 8
tuples slots at a time.




On Jan 27, 2017 1:53 AM, "Navin Ipe" 
wrote:

I found a part of the answer in the article itself "Note that
topology.receiver.buffer.size is in contrast to the other buffer size
related parameters described in this article actually not configuring the
size of an LMAX Disruptor queue. Rather it sets the size of a simple
ArrayList that is used to buffer incoming messages because in this specific
case the data structure does not need to be shared with other threads, i.e.
it is local to the worker’s receive thread."

But the question still remains. If Bolt A sends 1024 tuples, then won't it
overwhelm Bolt B?


On Fri, Jan 27, 2017 at 3:11 PM, Navin Ipe 
wrote:

> Hi,
>
> When going through Michael Noll's explanation
> 
> on the buffers, I noticed the topology.receive.buffer.size default is 8 and
> topology.send.buffer.size default is 1024.
>
> If I have a topology where Spout --> Bolt A ---> BoltB, then if Bolt A
> sends a tuple of size 1024, then BoltB won't be able to handle 1024 easily
> and will get overwhelmed because Bolt B's receive buffer size is just 8.
>
> Is my above understanding correct? Why are the buffers so different in
> sizes?
>
> --
> Regards,
> Navin
>



-- 
Regards,
Navin


Re: Storm deployment in wide-area network

2017-01-27 Thread Ambud Sharma
Nifi is a better tool for WAN data movements. There is no particular
benefit to deploy Storm either face internet or moving data over the
internet.


On Jan 25, 2017 9:43 AM, "Fan Jiang"  wrote:

Hi all,

I have a general question - is there any Storm deployment in wide-area
network? If so, what are the typical applications that would benefit from
such deployment?

Thanks,
Fan


Re: problem in running topology in local mode

2017-01-27 Thread Ambud Sharma
Your dependencies need to be overridden. If you are using Maven please look
at your dependency hierarchy and check what all is using slf log4j and fix
it the version of storm you are using.

On Jan 25, 2017 3:35 AM, "sam mohel"  wrote:

is there any help , please ?

On Tue, Jan 24, 2017 at 8:53 AM, sam mohel  wrote:

> I got this error when i ran my topology in local mode
> clojure.tools.logging$eval1$fn__7 invoke
> SEVERE:
> java.lang.NoSuchMethodError: org.slf4j.spi.LocationAwareLog
> ger.log(Lorg/slf4j/Marker;Ljava/lang/String;ILjava/lang/
> String;[Ljava/lang/Object;Ljava/lang/Throwable;)V
> at org.apache.log4j.Category.differentiatedLog(Category.java:186)
> at org.apache.log4j.Category.info(Category.java:229)
> at backtype.storm.utils.ShellProcess.getSerializer(ShellProcess
> .java:78)
> at backtype.storm.utils.ShellProcess.launch(ShellProcess.java:58)
> at backtype.storm.task.ShellBolt.prepare(ShellBolt.java:117)
> at backtype.storm.daemon.executor$fn__3439$fn__3451.invoke(
> executor.clj:699)
> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461)
> at clojure.lang.AFn.run(AFn.java:24)
> at java.lang.Thread.run(Thread.java:701)
>
>


Re: Is there a Storm hosted/cloud solution somewhere available?

2017-01-24 Thread Ambud Sharma
Hi Andre,

Try Hortonworks Cloudbreak: http://hortonworks.com/apache/cloudbreak/
provides you nice web interface to spool up clusters on-demand based on the
sizing and instance flavors.

It's not hosted however fairly easy to deploy.

On Tue, Jan 24, 2017 at 4:59 PM, Joaquin Menchaca 
wrote:

> cloudera?
>
> On Jan 23, 2017 2:44 AM, "André Martin" 
> wrote:
>
>> Dear Storm users,
>> I was wondering if there is a cloud hosted storm solution somewhere
>> available similar to Amazon EMR for hadoop where you do not have to setup a
>> cluster manually yourself - you only use the web interface to specify the
>> size of your cluster and submit the job/the topology etc.
>> Any help/information is highly appreciated. Thanks,
>>
>> Sincerely,
>>
>>André Martin
>>
>>


Re: deploy bolts to a specific supervisor

2016-12-19 Thread Ambud Sharma
Storm workers are suppose to be identical for the most part. You can tune
things a little by setting odd number of executors compared to the worker
count.

To ideally accomplish what you are trying to do you can:
1. make these "cpu intensive" bolts register their task ids to zookeeper or
other KV store
2. query this information and use direct grouping to manually route the
message

You will still have bolt instances for "cpu intensive" workload on your t1s
 however they will be idle.

On Thu, Nov 24, 2016 at 1:56 PM, Ohad Edelstein  wrote:

> Hey,
>
> We are running a storm application using a single type on instance in AWS
> and a single topology to run our system.
>
> This is causing some resource limitation issues. The way we want to
> address this is by splitting our IO intense bolts into a cluster of a few
> dozens/hundred  t1.small machines (for example) and all our CPU intense
> bolts to a few large machines with lots of cpu & memory.
>
> Basically what i am asking is, is there a way to start all this
> supervisors and then deploy one topology that include cpu intense bolts on
> the big machines and to the small machines the deploy IO bolts?
>
> All documentation I see, is relevant to version 0.9.x
>
> Thanks!
>


Re: Support of topic wildcards in Kafka spout

2016-12-19 Thread Ambud Sharma
No, this is currently not supported. Please open a feature request:
https://issues.apache.org/jira/browse/STORM/ so we can vote on it from a
community perspective and see if others would be interested in developing
this feature.

On Tue, Nov 22, 2016 at 5:53 AM, Wijekoon, Manusha <
manusha.wijek...@citi.com> wrote:

> We have a topology that has multiple kafka spout tasks. Each spout task is
> supposed to read a subset of messages from a set of Kafka topics. Topics
> have to be subscribed using a wild card such as AAA.BBB.*. The expected
> behaviour would be that all spout tasks collectively will consume all
> messages in all of the topics that match the wild card. Each message is
> only routed to a single spout task (Ignore failure scenarios). Is this
> currently supported?
>


Re: Apache Storm 1.0.2 integration with ElasticSearch 5.0.0

2016-12-19 Thread Ambud Sharma
That is wire-level protocol incompatibility for ES or zen is disabled or
nodes are not reachable in Elasticsearch.

On Mon, Nov 28, 2016 at 7:21 PM, Zhechao Ma 
wrote:

> As far as I know, storm-elasticsearch still doesn't support elasticsearch
> 5.0. You can use *Elasticsearch-Hadoop *5.x instead, which provides class 
> *EsBolt
> *for writing data from storm to elasitcsearch.
>
> https://github.com/elastic/elasticsearch-hadoop
>
> 2016-11-29 10:48 GMT+08:00 Abhishek Samuel :
>
>> Hi,
>>
>> I was writing a bolt to write to elasticsearch 5.0.0 running locally
>> using the examples provided here http://storm.apache.org/r
>> eleases/1.0.2/storm-elasticsearch.html
>>
>> Am using the maven dependency
>>
>> 
>>org.apache.storm
>>storm-elasticsearch
>>1.0.2
>>
>>
>> The code compiled, however at runtime i was getting the below error.
>> Could you please advise on what the issue could be ?
>>
>> Thanks and Regards
>> Abhishek Samuel
>>
>> *On the elasticsearch server terminal*
>>
>> [2016-11-28T18:17:58,810][WARN ][o.e.t.n.Netty4Transport  ] [ZCyigtO]
>> exception caught on transport layer [[id: 0x2f3da3a0, L:/127.0.0.1:9300 -
>> R:/127.0.0.1:52299]], closing connection
>>
>> java.lang.IllegalStateException: Received message from unsupported
>> version: [1.0.0] minimal compatible version is: [5.0.0]
>>
>> at 
>> org.elasticsearch.transport.TcpTransport.messageReceived(TcpTransport.java:1199)
>> ~[elasticsearch-5.0.0.jar:5.0.0]
>>
>> at org.elasticsearch.transport.netty4.Netty4MessageChannelHandl
>> er.channelRead(Netty4MessageChannelHandler.java:74)
>> ~[transport-netty4-5.0.0.jar:5.0.0]
>>
>>
>> *On the java runtime terminal*
>>
>>
>> 8262 [Thread-14-friends-es-bolt-executor[2 2]] ERROR o.a.s.d.executor -
>>
>> org.elasticsearch.client.transport.NoNodeAvailableException: None of the
>> configured nodes are available: []
>>
>> at org.elasticsearch.client.transport.TransportClientNodesServi
>> ce.ensureNodesAreAvailable(TransportClientNodesService.java:305)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at org.elasticsearch.client.transport.TransportClientNodesServi
>> ce.execute(TransportClientNodesService.java:200)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at org.elasticsearch.client.transport.support.InternalTransport
>> Client.execute(InternalTransportClient.java:106)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.elasticsearch.client.support.AbstractClient.index(AbstractClient.java:102)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.elasticsearch.client.transport.TransportClient.index(TransportClient.java:340)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.elasticsearch.action.index.IndexRequestBuilder.doExecute(IndexRequestBuilder.java:266)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:91)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:65)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.apache.storm.elasticsearch.bolt.EsIndexBolt.execute(EsIndexBolt.java:64)
>> [storm-elasticsearch-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.daemon.executor$fn__8058$tuple_action_fn__8060.invoke(executor.clj:731)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.daemon.executor$fn__8058$fn__8071$fn__8124.invoke(executor.clj:850)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>
>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
>>
>
>
>
> --
> Thanks
> Zhechao Ma
>


Re: Clarity on external/storm-kafka-client

2016-12-19 Thread Ambud Sharma
The Storm-External project has the Kafka Spouts and bolts; Storm doesn't
directly control the compatibility with Kafka, with that being said the
default version of Kafka integrations will work according to your list; so
the answer is Yes about the "compatibility"

However, it's still possible to use older version of Kafka with newer
version of Storm and vice-a-versa by either implementing your own Spouts /
Bolts or carefully working with selective dependency selection in your
topology.



On Wed, Nov 30, 2016 at 1:12 PM, Kristopher Kane 
wrote:

> I need some clarification on Kafka broker version support across Storm
> branches.
>
> storm-kafka-client uses the 'new' Kafka APIs
>
>
> The master branch README of storm-kafka-client says that it ONLY supports
> Kafka version .10 and later.
>
> The 1.x branch README of storm-kafka-client says that it ONLY supports
> Kafka version .10 and later.
>
> The 1.0.x branch README of storm-kafka-client says that it ONLY supports
> Kafka version .9 and later.
>
>
> Is it really true that 1.x and later only support Kafka version .10 and
> later or that it was tested on Kafka .10 but since it is the new API really
> still works on .9 as well  and the wording was never changed in the README?
>
> Thanks,
>
> Kris
>


Re: Does the bolt in between have the ability to re-emit a failed tuple?

2016-12-19 Thread Ambud Sharma
Replaying of tuples is done from the Spout and not done on a point to point
basis like Apache Flume.

Either a tuple is completely processed i.e. acked by every single bolt in
the pipeline or it's not; if it's not then it will be replayed by the Spout
(if the Spout implements a replay logic when the fail() method is called)

Acking has to be done at every stage of the pipeline.

Here's a post that will explain you the necessary details of the XoR
Ledger: https://bryantsai.com/fault-tolerant-message-processing-in-storm/

On Wed, Nov 30, 2016 at 9:10 PM, Navin Ipe 
wrote:

> Apart from the previous question, there's also the question of whether we
> should ack the tuple in execute() of Bolt1? Or is it just sufficient to ack
> it in Bolt2?
>
>
> On Wed, Nov 30, 2016 at 12:28 PM, Navin Ipe  com> wrote:
>
>> Hi,
>>
>> Just need a confirmation for this topology:
>>
>> *Spout* ---emit---> *Bolt1* ---emit---> *Bolt2*
>>
>> Spout is BaseRichSpout. Bolt1 and Bolt2 are BaseRichBolt.
>> Spout emits just one tuple per nextTuple() call.
>> Bolt1 anchors to the tuple it received from Spout and emits many
>> different tuple objects.
>>
>> If any of the emits of Bolt1 fails, is there no way for Bolt 1 to re-emit
>> the tuple? Do I have to wait for the topology to figure out that one of
>> Bolt1's tuples failed and then do a re-emit from the Spout?
>>
>> --
>> Regards,
>> Navin
>>
>
>
>
> --
> Regards,
> Navin
>


Re: Storm blobstore expiry

2016-12-19 Thread Ambud Sharma
What type of Blobstore is it?



On Thu, Dec 1, 2016 at 1:57 AM, Mostafa Gomaa  wrote:

> Hello All,
>
> I am using the storm blobstore to store some dynamic dictionaries, however
> I found that the blobstore had been cleared when I restarted the machine.
> Is there a way to make the blobstore non-volatile?
>
> Thanks
>


Re: help on Consuming the data from SSL enabled 0.9 kafka topic

2016-12-19 Thread Ambud Sharma
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_storm-user-guide/content/stormkafka-secure-config.html

On Mon, Dec 19, 2016 at 4:49 PM, Ambud Sharma <asharma52...@gmail.com>
wrote:

> Not sure if this helps:
>
> On Thu, Dec 1, 2016 at 4:11 AM, Srinivas.Veerabomma <
> srinivas.veerabo...@target.com> wrote:
>
>> Hi,
>>
>>
>>
>> I need some help. Basically looking for some sample Storm code or some
>> suggestions.
>>
>>
>>
>> My requirement is to develop a code in Apache Storm latest version to
>> consume the data from Kafka topic (Kafka version : “0.9.0.1”). This
>> kafka topic is SSL enabled and data is in Avro format. I was able to run
>> the command – “kafka-avro-console-consumer” at the command line and consume
>> the data.
>>
>>
>>
>> I also used the Kafka API code – “KafkaConsumer<String, Object> consumer = 
>> *new *KafkaConsumer<String, Object>(props);” and able to consume the data. 
>> In both the cases,  I provided the required details such as :
>>
>>
>>
>> *bootstrap.servers*
>>
>> *group.id <http://group.id>*
>>
>> *client.id <http://client.id>*
>>
>> *security.protocol*
>>
>> *ssl.protocol*
>>
>> *ssl.truststore.location*
>>
>> *ssl.truststore.password*
>>
>> *ssl.keystore.location*
>>
>> *ssl.keystore.password*
>>
>> *key.deserializer*
>>
>> *value.deserializer*
>>
>> *schema.registry.url*
>>
>>
>>
>>
>>
>> With the above two options (using Command Line and using Kafka API), I was 
>> able to connect to SSL enabled Kafka topic and read the Avro data and 
>> de-serialize it successfully. I want to achieve the same thing using Apache 
>> Storm code. I did some research and unable to find any sample code. Please 
>> let me know if you have any sample code or what Storm Class to use to 
>> achieve this. Thanks.
>>
>>
>>
>>
>>
>> Regards
>>
>> *Srini Veerabomma |  Big Data Business Intelligence, EDABI COE | **¤**Target
>> India|  GWS Center | Bangalore | Mobile : +91-94825-04323
>> <+91%2094825%2004323>*
>>
>>
>>
>
>


Re: help on Consuming the data from SSL enabled 0.9 kafka topic

2016-12-19 Thread Ambud Sharma
Not sure if this helps:

On Thu, Dec 1, 2016 at 4:11 AM, Srinivas.Veerabomma <
srinivas.veerabo...@target.com> wrote:

> Hi,
>
>
>
> I need some help. Basically looking for some sample Storm code or some
> suggestions.
>
>
>
> My requirement is to develop a code in Apache Storm latest version to
> consume the data from Kafka topic (Kafka version : “0.9.0.1”). This kafka
> topic is SSL enabled and data is in Avro format. I was able to run the
> command – “kafka-avro-console-consumer” at the command line and consume the
> data.
>
>
>
> I also used the Kafka API code – “KafkaConsumer consumer = 
> *new *KafkaConsumer(props);” and able to consume the data. In 
> both the cases,  I provided the required details such as :
>
>
>
> *bootstrap.servers*
>
> *group.id *
>
> *client.id *
>
> *security.protocol*
>
> *ssl.protocol*
>
> *ssl.truststore.location*
>
> *ssl.truststore.password*
>
> *ssl.keystore.location*
>
> *ssl.keystore.password*
>
> *key.deserializer*
>
> *value.deserializer*
>
> *schema.registry.url*
>
>
>
>
>
> With the above two options (using Command Line and using Kafka API), I was 
> able to connect to SSL enabled Kafka topic and read the Avro data and 
> de-serialize it successfully. I want to achieve the same thing using Apache 
> Storm code. I did some research and unable to find any sample code. Please 
> let me know if you have any sample code or what Storm Class to use to achieve 
> this. Thanks.
>
>
>
>
>
> Regards
>
> *Srini Veerabomma |  Big Data Business Intelligence, EDABI COE | **¤**Target
> India|  GWS Center | Bangalore | Mobile : +91-94825-04323
> <+91%2094825%2004323>*
>
>
>


Re: topology.debug always true

2016-12-19 Thread Ambud Sharma
Yes, your reasoning is correct. The topology is overriding the debug
configurations, Storm allows topology to override all of the topology
specific settings.

On Fri, Dec 2, 2016 at 3:41 AM, Mostafa Gomaa  wrote:

> I think nimbus configuration is what you have on your nimbus machine,
> "topology.debug" was set to false over there. Topology config are config
> values that start with the "topology." prefix in the yaml file. Your code
> was overriding the debug setting for the topology, so you were seeing two
> different values. I'm not too sure if my reasoning is correct though.
>
> On Dec 2, 2016 1:33 PM, "Ohad Edelstein"  wrote:
>
>> I am embarrassed to say that you are right… thanks!
>> But what is the difference between the two?
>> Thanks
>>
>> From: Mostafa Gomaa 
>> Reply-To: "user@storm.apache.org" 
>> Date: Friday, 2 December 2016 at 13:03
>> To: "user@storm.apache.org" 
>> Cc: "d...@storm.apache.org" 
>> Subject: Re: topology.debug always true
>>
>> Are you sure you're not overriding this configuration in code when
>> building the topology?
>>
>> On Dec 1, 2016 10:07 PM, "Ohad Edelstein"  wrote:
>>
>>> We see that the topology.debug set to two different values:
>>>
>>> And:
>>> We set the topology.debug to false ( we know its set that way to default
>>> but we thought that, this was the problem ).
>>> Is there a different meaning to the param in this two screens?
>>>
>>>
>>>


Re: Worker's Behavior With Heap Limit

2016-12-19 Thread Ambud Sharma
LRU caches are an effective memory management technique for Storm bolts if
lookup is what you are trying to do however if you are doing in memory
aggregations, I highly recommend sticking with standard Java maps and then
checkpoint state to an external data store (hbase, redis etc.)

Note:
* Storm aggregation MUST be time bound else you will be out of memory
* In Storm community we are trying to use Caffeine for Storm instead of
Guava.

On Thu, Dec 8, 2016 at 1:53 AM, Matt Lowe  wrote:

> Depending on what your memory needs to do you might find Guava useful:
> https://github.com/google/guava/wiki/CachesExplained
>
> Its a time/access based cache which will remove elements if they time out.
>
> Its all dependant on what your business logic is though.
>
>
> On 8 December 2016 at 10:48:17, Eranga Heshan (eranga@gmail.com)
> wrote:
>
> I think you are correct because I ran "free -m" command on the terminal
> and saw that free memory was rapidly decreasing. When it was near empty,
> the worker was killed. Then a new worker was started with a new PID.
>
> And yeah, there is a lot of data cached in memory waiting to be executed
> or waiting to be sent over the network. Probably that would be the issue.
>
> I have not heard of Guava caching technique. Instead of caching, I think
> of extending the memory of the server so I could extend the heap assignment
> to the worker.
>
> Thanks,
> Regards,
>
>
>
> Eranga Heshan
> *Undergraduate*
> Computer Science & Engineering
> University of Moratuwa
> Mobile:  +94 71 138 2686 <%2B94%2071%20552%202087>
> Email: era...@wso2.com 
> 
> 
> 
>
> On Thu, Dec 8, 2016 at 2:42 PM, Matt Lowe 
> wrote:
>
>> My guess is the following:
>>
>> 1. Memory limit is reached within worker JVM
>> 2. Worker process starts to throw OutOfMemoryErrors (though this could be
>> silent and not in the logs)
>> 3. JVM will terminate due to lack of memory
>> 4. Supervisor will start a new JVM with new process Id and start a worker
>>
>> This will happen over and over. As storm is a Highly Available service, a
>> new worker will be added if one should die.
>>
>> I assume you are caching a lot of data in your bolt to reach this limit?
>> Have you tried using Guava cache?
>>
>> //Matt
>>
>>
>> On 8 December 2016 at 09:19:43, Matt Lowe (matthewneill...@gmail.com)
>> wrote:
>>
>> Hello Eranga,
>>
>> Do you have the worker/supervisor logs when the worker dies?
>>
>> //Matt
>>
>>
>> On 8 December 2016 at 03:07:39, Eranga Heshan (eranga@gmail.com)
>> wrote:
>>
>> Hi all,
>>
>> In Storm, I configured the worker's maximum heap size to 1900MB. Then I
>> carefully monitored what happens when it reaches the limit.
>>
>> The worker is killed and a new worker is spawned with another process id.
>> Is my observation correct? If so why is that happening?
>>
>> Thanks,
>> Regards,
>>
>>
>> Eranga Heshan
>> *Undergraduate*
>> Computer Science & Engineering
>> University of Moratuwa
>> Mobile:  +94 71 138 2686 <%2B94%2071%20552%202087>
>> Email: era...@wso2.com 
>> 
>> 
>> 
>>
>>
>


Re: How to send multi tick tuple?

2016-12-19 Thread Ambud Sharma
Counting ticks with modulo operator is the ideal way to do it.

Here's an example for you:
https://github.com/Symantec/hendrix/blob/current/hendrix-alerts/src/main/java/io/symcpe/hendrix/alerts/SuppressionBolt.java#L136


Some slides explaining what's going on:
http://www.slideshare.net/HadoopSummit/in-flux-limiting-for-a-multitenant-logging-service


Hope this helps!

On Sun, Dec 18, 2016 at 7:15 PM, Chen Junfeng  wrote:

> Currently I use
>
> conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,1);
>
>
>
> to create timer task where it sends a timer tick tuple every one seconds
> as I set. .
>
>
>
> But now I need to create multiple timer tasks, for instance I need to be
> reminded every one second, ten seconds, one minutes on one topology. How
> should I do it?
>
>
>
> Thanks!
>
>
>
> Junfeng Chen
>
>
>
>
>


Re: ack in downstream when using all grouping method

2016-12-19 Thread Ambud Sharma
Storm is a framework built on replays, fundamentally replays are the way
guaranteed event processing is accomplished. Typically all Bolt Instances
in a given registered bolt should be running the same code, unless you are
doing some logic based on task ids. This implies that behavior of bolt
instances should be similar as well unless experiencing a hardware failure.

If I am understanding your use case you can either duplicate the data
outside storm (like write it to separate kafka topics) and have independent
spouts pick it up while keeping everything in 1 topology.

Grouping however is applied to one stream, you can have more than one
streams to have a logical separation as well.

I am still unsure about why would you get partial failures unless it's
frequent supervisor failure, may be you can provide more details about your
use case.

Lastly ALL groups are usually used for update delivery where
acknowledgements should matter, however if you can get away with using
unanchored tuples then that is also an alternative.


On Dec 19, 2016 4:17 PM, "Xunyun Liu" <xunyun...@gmail.com> wrote:

Thank you for your answer, Ambud. My use case is that only some of the bolt
instances are critical that I need them responding to the signal through
proper acknowledgment. However, the rest of them are non-critical which are
preferably not to interfere the normal ack process, much like receiving an
unanchored tuple. Is there any way that I can achieve this?

On 20 December 2016 at 11:01, Ambud Sharma <asharma52...@gmail.com> wrote:

> Forgot to answer your specific question. Storm message id is internal and
> will be different so you will see a duplicate tuple with a different id.
>
> On Dec 19, 2016 3:59 PM, "Ambud Sharma" <asharma52...@gmail.com> wrote:
>
>> Yes that is correct. All downstream tuples must be processed for the root
>> tuple to be acknowledged.
>>
>> Type of grouping does not change the acking behavior.
>>
>> On Dec 19, 2016 3:53 PM, "Xunyun Liu" <xunyun...@gmail.com> wrote:
>>
>>> Hi there,
>>>
>>> As some grouping methods allow sending multiple copies of emitted data
>>> to downstream bolt instances, I was wondering what will happen if any one
>>> of them is not able to ack the tuple due to failures. The intrinsic
>>> question is that, when the all grouping method is used, whether the
>>> recipients are receiving the exact the same tuple or just duplications with
>>> different tuple IDs. In the latter case, I believe the tuple tree is
>>> expanded with regard to the number of parallelisms in downstream and each
>>> task has to invoke ack() for the root tuple to be fully processed.
>>>
>>> Any idea is much appreciated.
>>>
>>>
>>> --
>>> Best Regards.
>>> ==
>>> Xunyun Liu
>>> ​​
>>>
>>>


-- 
Best Regards.
==
Xunyun Liu
The Cloud Computing and Distributed Systems (CLOUDS) Laboratory,
The University of Melbourne


Re: ack in downstream when using all grouping method

2016-12-19 Thread Ambud Sharma
Forgot to answer your specific question. Storm message id is internal and
will be different so you will see a duplicate tuple with a different id.

On Dec 19, 2016 3:59 PM, "Ambud Sharma" <asharma52...@gmail.com> wrote:

> Yes that is correct. All downstream tuples must be processed for the root
> tuple to be acknowledged.
>
> Type of grouping does not change the acking behavior.
>
> On Dec 19, 2016 3:53 PM, "Xunyun Liu" <xunyun...@gmail.com> wrote:
>
>> Hi there,
>>
>> As some grouping methods allow sending multiple copies of emitted data to
>> downstream bolt instances, I was wondering what will happen if any one of
>> them is not able to ack the tuple due to failures. The intrinsic question
>> is that, when the all grouping method is used, whether the recipients are
>> receiving the exact the same tuple or just duplications with different
>> tuple IDs. In the latter case, I believe the tuple tree is expanded with
>> regard to the number of parallelisms in downstream and each task has to
>> invoke ack() for the root tuple to be fully processed.
>>
>> Any idea is much appreciated.
>>
>>
>> --
>> Best Regards.
>> ==
>> Xunyun Liu
>> ​​
>>
>>


Re: ack in downstream when using all grouping method

2016-12-19 Thread Ambud Sharma
Yes that is correct. All downstream tuples must be processed for the root
tuple to be acknowledged.

Type of grouping does not change the acking behavior.

On Dec 19, 2016 3:53 PM, "Xunyun Liu"  wrote:

> Hi there,
>
> As some grouping methods allow sending multiple copies of emitted data to
> downstream bolt instances, I was wondering what will happen if any one of
> them is not able to ack the tuple due to failures. The intrinsic question
> is that, when the all grouping method is used, whether the recipients are
> receiving the exact the same tuple or just duplications with different
> tuple IDs. In the latter case, I believe the tuple tree is expanded with
> regard to the number of parallelisms in downstream and each task has to
> invoke ack() for the root tuple to be fully processed.
>
> Any idea is much appreciated.
>
>
> --
> Best Regards.
> ==
> Xunyun Liu
> ​​
>
>


Re: Containerising Storm

2016-11-22 Thread Ambud Sharma
https://github.com/Symantec/hendrix/blob/current/install/scripts/local-build-install-docker.sh

Here's a docker compose way of doing it. Zookeeper in this case is not
scalable.

I had to rebuild some of the images for Storm to fix issues with
environment variables and DNS lookups. Those images should work with
kubernetes as well.

On Nov 22, 2016 9:48 AM, "Cuneo, Nicholas"  wrote:

> We tried doing the same but ran into deployment issues with storm workers
> being deployed on separate docker instances. I wasn't directly involved so
> I can't give much details, I just know that storm is no longer
> containerized on our end.
>
> ---
> Sent from Boxer | http://getboxer.com 
>
> On November 22, 2016 at 9:08:53 AM PST, Al Hennessey <
> alhennesse...@gmail.com> wrote:
>
> Hi
> I am looking to containerise my storm setup so I can run it across
> services like kubernetes or mesos. I can't find an official docker release
> and wondering what you guys recommend or whether it's easier just to
> package one myself.
>
> Thanks for the help
>
>
> --
>
> This e-mail contains privileged and confidential information intended for
> the use of the addressees named above. If you are not the intended
> recipient of this e-mail, you are hereby notified that you must not
> disseminate, copy or take any action in respect of any information
> contained in it. If you have received this e-mail in error, please notify
> the sender immediately by e-mail and immediately destroy this e-mail and
> its attachments.
>


Re: Cant connect to remote cluster

2016-11-18 Thread Ambud Sharma
Please use IRC for this. Mailing list is not for Instant Messaging.

On Nov 18, 2016 2:21 PM, "Mostafa Gomaa"  wrote:

Also is nimbus by any chance hosted on AWS or Azure?

On Nov 19, 2016 12:18 AM, "Ohad Edelstein"  wrote:

> Try add the zookeeper part to the client storm.yaml
>
> I think that the request is searching for the zookeeper to pass the
> message and fails
>
> From: Al Hennessey 
> Reply-To: "user@storm.apache.org" 
> Date: Saturday, 19 November 2016 at 0:13
> To: "user@storm.apache.org" 
> Subject: Re: Cant connect to remote cluster
>
> Yh i ran the same command from the nimbus machine and it worked fine
>
> On 18 Nov 2016, at 22:12, Ohad Edelstein  wrote:
>
> Did you try running the command, from the same machine?
>
> From: Al Hennessey 
> Reply-To: "user@storm.apache.org" 
> Date: Saturday, 19 November 2016 at 0:10
> To: "user@storm.apache.org" 
> Subject: Re: Cant connect to remote cluster
>
> Hi, i thanks for the reply, i added the ip to my hosts file with a domain
> name, but it still did not work and threw the same error
> Just to let you know that i am on a mac
>
> Thanks for the help
>
> On 18 Nov 2016, at 22:01, sam mohel  wrote:
>
> m new to storm but let's try that and I hope it will work
> Check the file which has definitions of IP
> It exists in this path
> /ets/hosts  it should contains like127.0.x.x mydomain
>  1- Check that 130.211.53.188 has domain name or it already exists or
> 2-If you found it try to use the domain name instead of IP address
> On Friday, November 18, 2016, Al Hennessey 
> wrote:
>
>> Hi, i am having an issue connecting to my remote clustered storm server,
>> when i try running “storm list” i get this error
>>
>> Exception in thread "main" java.lang.RuntimeException:
>> org.apache.storm.thrift.transport.TTransportException:
>> java.net.UnknownHostException: nimbus-dev.c.epic-dev.internal
>> at org.apache.storm.security.auth.TBackoffConnect.retryNext(TBa
>> ckoffConnect.java:64)
>> at org.apache.storm.security.auth.TBackoffConnect.doConnectWith
>> Retry(TBackoffConnect.java:56)
>> at org.apache.storm.security.auth.ThriftClient.reconnect(Thrift
>> Client.java:99)
>> at org.apache.storm.security.auth.ThriftClient.(ThriftCli
>> ent.java:69)
>> at org.apache.storm.utils.NimbusClient.(NimbusClient.java:106)
>> at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(Ni
>> mbusClient.java:78)
>> at org.apache.storm.command.list$_main.invoke(list.clj:22)
>> at clojure.lang.AFn.applyToHelper(AFn.java:152)
>> at clojure.lang.AFn.applyTo(AFn.java:144)
>> at org.apache.storm.command.list.main(Unknown Source)
>> Caused by: org.apache.storm.thrift.transport.TTransportException:
>> java.net.UnknownHostException: nimbus-dev.c.epic-dev.internal
>> at org.apache.storm.thrift.transport.TSocket.open(TSocket.java:226)
>> at org.apache.storm.thrift.transport.TFramedTransport.open(TFra
>> medTransport.java:81)
>> at org.apache.storm.security.auth.SimpleTransportPlugin.connect
>> (SimpleTransportPlugin.java:103)
>> at org.apache.storm.security.auth.TBackoffConnect.doConnectWith
>> Retry(TBackoffConnect.java:53)
>> ... 8 more
>> Caused by: java.net.UnknownHostException: nimbus-dev.c.epic-dev.internal
>> at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocket
>> Impl.java:184)
>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
>> at java.net.Socket.connect(Socket.java:589)
>> at org.apache.storm.thrift.transport.TSocket.open(TSocket.java:221)
>>
>>
>> Here is my nimbus storm.yaml
>>
>> nimbus.seeds:
>>   - "130.211.53.188"
>> nimbus.thrift.port: 6627
>> storm.local.dir: /var/storm-dir
>> storm.zookeeper.servers:
>>   - "104.199.7.70"
>> ui.host: 0.0.0.0
>> ui.port: 8080
>> supervisor.slots.ports:
>>   - 6700
>>   - 6701
>>   - 6702
>>   - 6703
>>
>> My client storm uses the same nimbus host line:
>>
>> nimbus.seeds:
>>   - “130.211.53.188”
>>
>> Not sure why i can’t connect, i am running on google cloud platform and
>> all my ports are open so it should be able to talk to each other.
>>
>> Thanks for the help
>>
>>
>>
>
>


Re: Is it possible to emit different number of OutputFields

2016-10-17 Thread Ambud Sharma
You should declare 2 streams. Bolt configuration can't be passed to the
declareoutput fields method call.

On Oct 16, 2016 1:47 PM, "Darsh"  wrote:

> Is it possible to emit different number of output fields from storm a bolt?
>
> For ex:
>
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>  if(…)
>  declarer.declare(new Fields(“ABC"));
>  if(…)
>  declarer.declare(new Fields(“XYZ”), new Fields(“QQQ");
>   }
>
>
>
> Or do I have to create 2 different streams ? Something like,
>
> outputFieldsDeclarer.declareStream("stream1", new Fields("field1"));
>
> outputFieldsDeclarer.declareStream("stream2", new Fields("field2"));
>
>
>
>
>
> Thanks
>
> Darsh
>


Re: Issue with Storm 1.0.2 on docker

2016-10-14 Thread Ambud Sharma
TLDR; here's working compose template for docker storm with Kafka and zk.
Please remove the project specific components and you should be able to run
the whole this with scaling:

https://github.com/Symantec/hendrix/blob/current/install/conf/local/docker-compose-backup.yml

On Oct 13, 2016 9:26 PM, "Joaquin Menchaca"  wrote:

> I am a little fuzzy about what you are trying to to.  The you expose the
> correct ports (Dockerfile) and map them to the host (docker command
> option)?  6127, 8000, 8080?   Are you trying to run storm jar  and storm
> list inside the container, or outside the container.   Are you trying to do
> this on one container?  Or multiple?
>
> If you are doing everything inside the container, the mini-vm style, then
> you really do not need to configure storm.yaml, it'll default to localhost
> for local mode.  You'll want to get into storm.yaml when you want to setup
> a cluster, and when you want to submit a topology to a cluster.
>
> For cluster services, you'll want to use multiple containers (at least for
> supervisors).  There may be a challenge for as multiple supervisors using
> the same ports.  Docker-Compose and Ansible could be easier tools to use
> for orchestrating multiple containers, rather than docker commands.
>
> In a cluster, you can use the storm ui and logviewer to drill down into
> the bolts and such. In this case, you would definitely need to control how
> these containers are labeled, by controlling the /etc/hosts.
> Docker-Compose can do some of this automatically.  Then to the outside,
> like a web browser, you'll need to map the hostname, e.g. nimbus.dev,
> supervisor1.dev, etc., so that they are the same internally and
> externally.  You can do this with dnscache or something on the host (or
> regular DNS server for staging), and hosts files for the storm containers
> internally, plus one container that acts as a reverse-proxy, virtual host
> for nimbus.dev, supervisor1.dev, etc., which then routes to appropriate
> container on 8000 and 8080.
>
>
>
>
> On Tue, Oct 11, 2016 at 9:08 PM, Sudha Subramanian 
> wrote:
>
>> Hi,
>>
>> I'm running Storm 1.0.2 components in docker container all on the same VM.
>>
>> I'm running into an issue where based on the configuration of storm.yaml
>> and container host name, I can either get the storm command line to work,
>> or the Storm UI to connect But, not both.
>>
>> Following configuration allows me to use storm command line and perform
>> commands like list and jar.
>>
>> Storm.yaml:
>>
>>   storm.zookeeper.servers:
>>
>> - 172.x.x.x
>>  nimbus.seeds: ["*104.10.x.x*"]
>>
>>>
>>
>>  Where, 172.x.x.x -> is the docker IP address where zookeeper runs
>>
>> and   104.10.x.x -> is the external IP address of the VM.
>>
>>I run docker with -h option as follows: *docker -h 104.10.x.x *storm
>> nimbus
>>
>>   With the above setting, I'm able to perform operations remotely using
>> storm ( eg storm list). However, I'm unable to connect to storm UI using
>> browser. The browser spins forever trying to get cluster information.
>>
>> Following configuration allows me to connect using the UI but not storm
>> jar.
>>
>> storm.yaml:
>>   storm.zookeeper.servers:
>>   - 172.x.x.x.
>>   nimbus.seeds: ["1*27.0.0.1*"]
>>
>> Nimbus seeds point to 127.0.0.1.
>> And, I *don't run docker with the -h option*.
>>
>> I tried specifying storm.local.hostname etc. But, I'm not able to get
>> Storm 1.0 work in container.
>>
>> I lost almost 4 days trying different configuration. Can someone help
>> me?, please.
>>
>> Thanks,
>> Sudha
>>
>>
>>
>
>
> --
>
> 是故勝兵先勝而後求戰,敗兵先戰而後求勝。
>


Re: Submitting Topology Questions (2 small questions)

2016-09-21 Thread Ambud Sharma
Just the nimbus address and port

On Sep 21, 2016 6:50 PM, "Joaquin Menchaca"  wrote:

> What is the minimal storm.yaml configuration do I need for `storm jar ...
> remote`?
>
> Is there a command line option or way to specify locally crafted
> storm.yaml, e.g. /tmp/storm.yaml?
>
> --
>
> 是故勝兵先勝而後求戰,敗兵先戰而後求勝。
>


Re: Large number of very small streams

2016-09-21 Thread Ambud Sharma
Two solutions:

1. You can group users by some sort of classification and create topics
based on that then for each user the consumer can check if it's interested
in the topic and consumer or reject the messages.

2. If each user writes a lot of data then you can use the concept of key
based custom partitioner.

For consumption depending on wrote volume and velocity you may want to
consider using a database for viewing events per user.

On Sep 21, 2016 3:01 PM, "Ivan Gozali"  wrote:

> Hi everyone,
>
> I'm very new to Storm, and have read various documentation but haven't
> started using it.
>
> I have a use case where I could potentially have many users producing data
> points that are accumulated in one huge, single Kafka topic/Kinesis stream,
> and I was going to use Storm to "route" per-user mini-streams coming from
> this single huge stream to multiple processors.
>
> I was wondering how this use case is typically handled. I was going to
> create a topology (where the spout consumes the big stream) for each user's
> mini-stream that is then pushed to some new derived stream in
> Kinesis/Kafka, but this doesn't seem right, since there could be 100,000s,
> if not 1,000,000s of users and I would be creating 1,000,000 topics.
>
> Thanks in advance for any advice!
>
> --
> Regards,
>
>
> Ivan Gozali
>


Re: Syncing multiple streams to compute final result from a bolt

2016-09-20 Thread Ambud Sharma
Is this real-time or batch?

If batch this is perfect for MapReduce or Spark.

If real-time then you should use Spark or Storm Trident.

On Sep 20, 2016 9:39 AM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:

> My use case is that I have a json which contains an array. I need to split
> that array into multiple jsons and do some computations on them. After
> that, results from each json has to be used in further calculation
> altogether and come up with the final result.
>
> *Cheers!*
>
> Harsh Choudhary / Software Engineer
>
> Blog / express.harshti.me
>
> [image: Facebook] <https://facebook.com/shry.harsh> [image: Twitter]
> <https://twitter.com/har_ssh> [image: Google Plus]
> <https://plus.google.com/107567038912927268680>
> <https://in.linkedin.com/in/choudharyharsh> [image: Linkedin]
> <https://in.linkedin.com/in/choudharyharsh> [image: Instagram]
> <https://instagram.com/harsh.choudhary>
> <https://www.pinterest.com/shryharsh/>[image: 500px]
> <https://500px.com/harshchoudhary> [image: github]
> <https://github.com/shry15harsh>
>
> On Tue, Sep 20, 2016 at 9:09 PM, Ambud Sharma <asharma52...@gmail.com>
> wrote:
>
>> What's your use case?
>>
>> The complexities can be manage d as long as your tuple branching is
>> reasonable I.e. 1 tuple creates several other tuples and you need to sync
>> results between them.
>>
>> On Sep 20, 2016 8:19 AM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:
>>
>>> You're right. For that I have to manage the queue and all those
>>> complexities of timeout. If Storm is not the right place to do this then
>>> what else?
>>>
>>>
>>>
>>> On Tue, Sep 20, 2016 at 8:25 PM, Ambud Sharma <asharma52...@gmail.com>
>>> wrote:
>>>
>>>> The correct way is to perform time window aggregation using bucketing.
>>>>
>>>> Use the timestamp on your event computed from.various stages and send
>>>> it to a single bolt where the aggregation happens. You only emit from this
>>>> bolt once you receive results from both parts.
>>>>
>>>> It's like creating a barrier or the join phase of a fork join pool.
>>>>
>>>> That said the more important question is is Storm the right place do to
>>>> this? When you perform time window aggregation you are susceptible to tuple
>>>> timeouts and have to also deal with making sure your aggregation is
>>>> idempotent.
>>>>
>>>> On Sep 20, 2016 7:49 AM, "Harsh Choudhary" <shry.ha...@gmail.com>
>>>> wrote:
>>>>
>>>>> But how would that solve the syncing problem?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Sep 20, 2016 at 8:12 PM, Alberto São Marcos <
>>>>> alberto@gmail.com> wrote:
>>>>>
>>>>>> I would dump the *Bolt-A* results in a shared-data-store/queue and
>>>>>> have a separate workflow with another spout and Bolt-B draining from 
>>>>>> there
>>>>>>
>>>>>> On Tue, Sep 20, 2016 at 9:20 AM, Harsh Choudhary <
>>>>>> shry.ha...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> I am thinking of doing the following.
>>>>>>>
>>>>>>> Spout subscribed to Kafka and get JSONs. Spout emits the JSONs as
>>>>>>> individual tuples.
>>>>>>>
>>>>>>> Bolt-A has subscribed to the spout. Bolt-A creates multiple JSONs
>>>>>>> from a json and emits them as multiple streams.
>>>>>>>
>>>>>>> Bolt-B receives these streams and do the computation on them.
>>>>>>>
>>>>>>> I need to make a cumulative result from all the multiple JSONs
>>>>>>> (which are emerged from a single JSON) in a Bolt. But a bolt static
>>>>>>> instance variable is only shared between tasks per worker. How do 
>>>>>>> achieve
>>>>>>> this syncing process.
>>>>>>>
>>>>>>>   --->
>>>>>>> Spout ---> Bolt-A   --->   Bolt-B  ---> Final result
>>>>>>>   --->
>>>>>>>
>>>>>>> The final result is per JSON which was read from Kafka.
>>>>>>>
>>>>>>> Or is there any other way to achieve this better?
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>
>


Re: Who needs more memory?

2016-09-20 Thread Ambud Sharma
Allocate RAM for workers that are launched on supervisor nodes. Workers do
the heavy lifting and are the component that actually run your topology.

On Sep 20, 2016 11:51 AM, "Thomas Cristanis" 
wrote:

> I am using the storm for an academic experiment and have a question. Where
> it is necessary to allocate more memory (RAM) for Zookeeper, Nimbus or the
> supervisors? Why?
>


Re: Syncing multiple streams to compute final result from a bolt

2016-09-20 Thread Ambud Sharma
What's your use case?

The complexities can be manage d as long as your tuple branching is
reasonable I.e. 1 tuple creates several other tuples and you need to sync
results between them.

On Sep 20, 2016 8:19 AM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:

> You're right. For that I have to manage the queue and all those
> complexities of timeout. If Storm is not the right place to do this then
> what else?
>
>
>
> On Tue, Sep 20, 2016 at 8:25 PM, Ambud Sharma <asharma52...@gmail.com>
> wrote:
>
>> The correct way is to perform time window aggregation using bucketing.
>>
>> Use the timestamp on your event computed from.various stages and send it
>> to a single bolt where the aggregation happens. You only emit from this
>> bolt once you receive results from both parts.
>>
>> It's like creating a barrier or the join phase of a fork join pool.
>>
>> That said the more important question is is Storm the right place do to
>> this? When you perform time window aggregation you are susceptible to tuple
>> timeouts and have to also deal with making sure your aggregation is
>> idempotent.
>>
>> On Sep 20, 2016 7:49 AM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:
>>
>>> But how would that solve the syncing problem?
>>>
>>>
>>>
>>> On Tue, Sep 20, 2016 at 8:12 PM, Alberto São Marcos <
>>> alberto@gmail.com> wrote:
>>>
>>>> I would dump the *Bolt-A* results in a shared-data-store/queue and
>>>> have a separate workflow with another spout and Bolt-B draining from there
>>>>
>>>> On Tue, Sep 20, 2016 at 9:20 AM, Harsh Choudhary <shry.ha...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I am thinking of doing the following.
>>>>>
>>>>> Spout subscribed to Kafka and get JSONs. Spout emits the JSONs as
>>>>> individual tuples.
>>>>>
>>>>> Bolt-A has subscribed to the spout. Bolt-A creates multiple JSONs from
>>>>> a json and emits them as multiple streams.
>>>>>
>>>>> Bolt-B receives these streams and do the computation on them.
>>>>>
>>>>> I need to make a cumulative result from all the multiple JSONs (which
>>>>> are emerged from a single JSON) in a Bolt. But a bolt static instance
>>>>> variable is only shared between tasks per worker. How do achieve this
>>>>> syncing process.
>>>>>
>>>>>   --->
>>>>> Spout ---> Bolt-A   --->   Bolt-B  ---> Final result
>>>>>   --->
>>>>>
>>>>> The final result is per JSON which was read from Kafka.
>>>>>
>>>>> Or is there any other way to achieve this better?
>>>>>
>>>>
>>>>
>>>
>


Re: Syncing multiple streams to compute final result from a bolt

2016-09-20 Thread Ambud Sharma
The correct way is to perform time window aggregation using bucketing.

Use the timestamp on your event computed from.various stages and send it to
a single bolt where the aggregation happens. You only emit from this bolt
once you receive results from both parts.

It's like creating a barrier or the join phase of a fork join pool.

That said the more important question is is Storm the right place do to
this? When you perform time window aggregation you are susceptible to tuple
timeouts and have to also deal with making sure your aggregation is
idempotent.

On Sep 20, 2016 7:49 AM, "Harsh Choudhary"  wrote:

> But how would that solve the syncing problem?
>
>
>
> On Tue, Sep 20, 2016 at 8:12 PM, Alberto São Marcos  > wrote:
>
>> I would dump the *Bolt-A* results in a shared-data-store/queue and have
>> a separate workflow with another spout and Bolt-B draining from there
>>
>> On Tue, Sep 20, 2016 at 9:20 AM, Harsh Choudhary 
>> wrote:
>>
>>> Hi
>>>
>>> I am thinking of doing the following.
>>>
>>> Spout subscribed to Kafka and get JSONs. Spout emits the JSONs as
>>> individual tuples.
>>>
>>> Bolt-A has subscribed to the spout. Bolt-A creates multiple JSONs from a
>>> json and emits them as multiple streams.
>>>
>>> Bolt-B receives these streams and do the computation on them.
>>>
>>> I need to make a cumulative result from all the multiple JSONs (which
>>> are emerged from a single JSON) in a Bolt. But a bolt static instance
>>> variable is only shared between tasks per worker. How do achieve this
>>> syncing process.
>>>
>>>   --->
>>> Spout ---> Bolt-A   --->   Bolt-B  ---> Final result
>>>   --->
>>>
>>> The final result is per JSON which was read from Kafka.
>>>
>>> Or is there any other way to achieve this better?
>>>
>>
>>
>


Re: Storm 1.0.2 - KafkaSpout cannot find partition information

2016-09-17 Thread Ambud Sharma
I believe you are overriding the value here:

spoutConfig.zkRoot = "/brokers”;

On Sep 17, 2016 9:29 AM, "Dominik Safaric" <dominiksafa...@gmail.com> wrote:

> Already configured with an empty string, but unfortunately I keep getting
> the same message claiming no partitions can be found.
>
> Dominik Šafarić
>
> On 17 Sep 2016, at 18:11, Ambud Sharma <asharma52...@gmail.com> wrote:
>
> The Zkroot should be empty string.
>
> On Sep 17, 2016 9:09 AM, "Dominik Safaric" <dominiksafa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I’ve deployed a topology consisting of a KafkaSpout using Kafka 0.10.0.1
>> and Zookeeper 3.4.6. All of the services, including the Nimbus and
>> Supervisor, run on the same instance.
>>
>> However, by examining the worker.log file, I’ve found that the KafkaSpout
>> is unable to find partitions information of the given topic. The following
>> log messages for each of the 12 partitions appear:
>>
>> 2016-09-17 17:37:33.333 o.a.s.k.PartitionManager [INFO] No partition
>> information found, using configuration to determine offset
>> 2016-09-17 17:37:33.333 o.a.s.k.PartitionManager [INFO] Last commit
>> offset from zookeeper: 0
>> 2016-09-17 17:37:33.333 o.a.s.k.PartitionManager [INFO] Commit offset 0
>> is more than 9223372036854775807 behind latest offset 0, resetting to
>> startOffsetTime=-2
>>
>> Whereas, the SpoutConf is instantiated as follows:
>>
>> SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", new String(),
>> UUID.randomUUID().toString());
>> spoutConfig.scheme = new RawMultiScheme();
>> spoutConfig.zkServers = Arrays.asList("localhost");
>> spoutConfig.zkPort = 2181;
>> spoutConfig.zkRoot = "/brokers”;
>>
>> By examining Zookeeper’s zNodes using the zkCli script, I’ve found that
>> the partitions information is stored properly as prescribed by the default
>> configuration: /brokers/topics//partitions/…
>>
>> What exactly might be the reason behind KafkaSpout not being able to
>> consume messages from Kafka, i.e. find partition information? Is there a
>> configuration mistake I’ve made?
>>
>> Thanks in advance!
>>
>> Dominik
>>
>


Re: Storm 1.0.2 - KafkaSpout cannot find partition information

2016-09-17 Thread Ambud Sharma
The Zkroot should be empty string.

On Sep 17, 2016 9:09 AM, "Dominik Safaric"  wrote:

> Hi,
>
> I’ve deployed a topology consisting of a KafkaSpout using Kafka 0.10.0.1
> and Zookeeper 3.4.6. All of the services, including the Nimbus and
> Supervisor, run on the same instance.
>
> However, by examining the worker.log file, I’ve found that the KafkaSpout
> is unable to find partitions information of the given topic. The following
> log messages for each of the 12 partitions appear:
>
> 2016-09-17 17:37:33.333 o.a.s.k.PartitionManager [INFO] No partition
> information found, using configuration to determine offset
> 2016-09-17 17:37:33.333 o.a.s.k.PartitionManager [INFO] Last commit
> offset from zookeeper: 0
> 2016-09-17 17:37:33.333 o.a.s.k.PartitionManager [INFO] Commit offset 0
> is more than 9223372036854775807 behind latest offset 0, resetting to
> startOffsetTime=-2
>
> Whereas, the SpoutConf is instantiated as follows:
>
> SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", new String(),
> UUID.randomUUID().toString());
> spoutConfig.scheme = new RawMultiScheme();
> spoutConfig.zkServers = Arrays.asList("localhost");
> spoutConfig.zkPort = 2181;
> spoutConfig.zkRoot = "/brokers”;
>
> By examining Zookeeper’s zNodes using the zkCli script, I’ve found that
> the partitions information is stored properly as prescribed by the default
> configuration: /brokers/topics//partitions/…
>
> What exactly might be the reason behind KafkaSpout not being able to
> consume messages from Kafka, i.e. find partition information? Is there a
> configuration mistake I’ve made?
>
> Thanks in advance!
>
> Dominik
>


Re: SpoutConfig zkRoot argument causing KafkaSpout exception

2016-09-17 Thread Ambud Sharma
It will refer to the root, it uses a slash by default. As you can see you
have 2 in the path:

*"//bytes3b68b144-e13c-4de3-**beed**-405e3ca5ae20/partition_1"*



On Sep 17, 2016 12:49 AM, "Dominik Safaric" <dominiksafa...@gmail.com>
wrote:

> If the value is set to an empty string, to what path does it actually
> refer to?
>
> Dominik
>
> On 17 Sep 2016, at 09:40, Ambud Sharma <asharma52...@gmail.com> wrote:
>
> Zkroot should be empty string not a /.
>
> Basically that config refers to the path where the consumer offsets will
> be stored.
>
> On Sep 17, 2016 12:20 AM, "Dominik Safaric" <dominiksafa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I’ve set up a topology consisting of a Kafka spout. But unfortunately, I
>> keep getting the exception *Caused by:
>> java.lang.IllegalArgumentException: Invalid path string
>> "//bytes3b68b144-e13c-4de3-beed-405e3ca5ae20/partition_1" caused by empty
>> node name specified @1*.
>>
>> Zookeeper has the default client port set (i.e. 2181), whereas the
>> brokers path is default as well.
>>
>> I supply SpoutConfig with the following arguments:
>>
>> *SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "/", "bytes" +
>> UUID.randomUUID().toString());*
>>
>> However, obviously the problem seems to be in the zkRoot argument I’ve
>> suplied SpoutConfig with.
>>
>> What value should it actually be? What does the zkRoot argument refer to?
>>
>> Thanks in advance!
>>
>


Re: SpoutConfig zkRoot argument causing KafkaSpout exception

2016-09-17 Thread Ambud Sharma
Zkroot should be empty string not a /.

Basically that config refers to the path where the consumer offsets will be
stored.

On Sep 17, 2016 12:20 AM, "Dominik Safaric" 
wrote:

> Hi,
>
> I’ve set up a topology consisting of a Kafka spout. But unfortunately, I
> keep getting the exception *Caused by:
> java.lang.IllegalArgumentException: Invalid path string
> "//bytes3b68b144-e13c-4de3-beed-405e3ca5ae20/partition_1" caused by empty
> node name specified @1*.
>
> Zookeeper has the default client port set (i.e. 2181), whereas the brokers
> path is default as well.
>
> I supply SpoutConfig with the following arguments:
>
> *SpoutConfig spoutConfig = new SpoutConfig(hosts, "bytes", "/", "bytes" +
> UUID.randomUUID().toString());*
>
> However, obviously the problem seems to be in the zkRoot argument I’ve
> suplied SpoutConfig with.
>
> What value should it actually be? What does the zkRoot argument refer to?
>
> Thanks in advance!
>


Re: storm-kafka-client 1.1.0 tag - Multiple Spouts Eventually Freeze

2016-09-14 Thread Ambud Sharma
I have seen that behavior only when running in local mode of storm and
there is no data flowing in.

This sounds like it might have something to do with zookeeper as in your
offsets in zookeeper are either not updated or the watches are not being
triggered for the spout to consume.

Try using the zkcli and manually ls the consumer group znodes and cat them
to see what data is there.

Thanks,
Ambud

On Sep 12, 2016 12:03 PM, "Zach Schoenberger"  wrote:

> Hi all,
>
> I am working with the storm-kafka-client and am seeing some odd behavior.
> Whenever I setup a topology with the storm-kafka-client with 2 tasks, 2
> workers, and parallelism hint of 2, I see that the spout will eventually
> only poll 0 events from kafka, even when I have verified that there are new
> events flowing in. The number of partitions is 180 for the topic. The kafka
> version I am using is 0.10.1.
>
> Any help/insight would be greatly appreciated.
>
> Thanks,
> Zach
>


Re: Fwd: Serialize error when storm supervisor starting executor

2016-09-14 Thread Ambud Sharma
Can you post the snippet of your pom.xml file especially around where
storm-core is imported?

I suspect you are not excluding dependencies explicitly if there is a
conflict in maven .

What is serialized is your bolt instance so you need either have
serialization objects or mark them transient and instantiate them in the
prepare method.

Thanks,
Ambid

On Sep 12, 2016 11:39 AM, "Jianfei Li"  wrote:

>
> Hi,
>
> I got a InvalidClassException in the supervior after uploading the jars to
> storm using the *"storm jar"* command. The exception is as below:
>
> 2016-09-06 21:49:32.649 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor __acker:[8 8]
> 2016-09-06 21:49:32.661 org.apache.storm.daemon.executor [INFO][main] Loaded 
> executor tasks __acker:[8 8]
> 2016-09-06 21:49:32.672 org.apache.storm.daemon.executor [INFO][main] 
> Timeouts disabled for executor __acker:[8 8]
> 2016-09-06 21:49:32.673 org.apache.storm.daemon.executor [INFO][main] 
> Finished loading executor __acker:[8 8]
> 2016-09-06 21:49:32.688 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor EventDistributionBolt:[2 2]
> 2016-09-06 21:49:32.711 org.apache.storm.daemon.executor [INFO][main] Loaded 
> executor tasks EventDistributionBolt:[2 2]
> 2016-09-06 21:49:32.713 org.apache.storm.daemon.executor [INFO][main] 
> Finished loading executor EventDistributionBolt:[2 2]
> 2016-09-06 21:49:32.728 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor SubscriptionFinderBolt:[6 6]
> 2016-09-06 21:49:32.735 org.apache.storm.daemon.executor [INFO][main] Loaded 
> executor tasks SubscriptionFinderBolt:[6 6]
> 2016-09-06 21:49:32.742 org.apache.storm.daemon.executor [INFO][main] 
> Finished loading executor SubscriptionFinderBolt:[6 6]
> 2016-09-06 21:49:32.753 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor __system:[-1 -1]
> 2016-09-06 21:49:32.755 org.apache.storm.daemon.executor [INFO][main] Loaded 
> executor tasks __system:[-1 -1]
> 2016-09-06 21:49:32.757 org.apache.storm.daemon.executor [INFO][main] 
> Finished loading executor __system:[-1 -1]
> 2016-09-06 21:49:32.768 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor __metricscom.xxx.alp.storm.metric.ServiceLogMetricsConsumer:[10 10]
> 2016-09-06 21:49:32.771 org.apache.storm.daemon.executor [INFO][main] Loaded 
> executor tasks 
> __metricscom..alp.storm.metric.ServiceLogMetricsConsumer:[10 10]
> 2016-09-06 21:49:32.777 org.apache.storm.daemon.executor [INFO][main] 
> Finished loading executor 
> __metricscom..alp.storm.metric.ServiceLogMetricsConsumer:[10 10]
> 2016-09-06 21:49:32.789 org.apache.storm.daemon.executor [INFO][main] Loading 
> executor EventStreamSpout:[4 4]
> 2016-09-06 21:49:32.807 org.apache.storm.messaging.netty.Client 
> [ERROR][client-boss-1] connection attempt 2 to 
> Netty-Client-alp-storm-supervisor-2a-i-f478ab2f..com/:6700 
> failed: java.net.ConnectException: Connection refused: 
> alp-storm-supervisor-2a-i-f478ab2f..com/:6700
> 2016-09-06 
>  
> 21:49:32.936 org.apache.storm.daemon.worker [ERROR][main] Error on 
> initialization of server mk-workerjava.lang.RuntimeException: 
> java.io.InvalidClassException: 
> com.fasterxml.jackson.databind.deser.BasicDeserializerFactory; local class 
> incompatible: stream classdesc serialVersionUID = -1426550576764902820, local 
> class serialVersionUID = 2445376702910286321
> at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:179) 
> ~[storm-core-1.0.x.jar:?]
> at org.apache.storm.utils.Utils.getSetComponentObject(Utils.java:430) 
> ~[storm-core-1.0.x.jar:?]
> at org.apache.storm.daemon.task$get_task_object.invoke(task.clj:74) 
> ~[storm-core-1.0.x.jar:?]
> at 
> org.apache.storm.daemon.task$mk_task_data$fn__5625.invoke(task.clj:177) 
> ~[storm-core-1.0.x.jar:?]
> at org.apache.storm.util$assoc_apply_self.invoke(util.clj:930) 
> ~[storm-core-1.0.x.jar:?]
> at org.apache.storm.daemon.task$mk_task_data.invoke(task.clj:170) 
> ~[storm-core-1.0.x.jar:?]
> at ..
>
> From the exception, I figured out it was because the version of
> jackson-databind classes in the uploaded jar was not the same as the
> jackson-databind the supervisor uses. Then I make them the same and
> re-upload the jars again. The executor at the supervisor node could be
> started without issue this time.
>
> Based on this issue, my questions are:
>
>1. Do I have to make all the shared classes between supervisor and the
>jar I uploaded the same version? This seems impossible, since once I
>uploading new jars containing new version or deploying new version of class
>to the supervisor, the issue above would happen.
>2. What is the serialize/deserialize machanism storm uses during
>starting an executor inside the worker process? What would it
>serialize/deserialize?
>  

Re: Join stream with historical data

2016-09-14 Thread Ambud Sharma
Yes you can build something for data enrichment as long as your use some
sort of LRU cache on the bolt that is fairly sizable and your event volume
is reasonable to make sure there won't be a bottleneck in the topology.

On Sep 13, 2016 10:43 AM, "Daniela S"  wrote:

> Dear all,
>
> is it possible to add some kind of "historical data" to a data stream in
> Storm? I would like to lookup an entry in a database for every tuple when
> it arrives in Storm and to join the values of this entry with the tuple to
> proceed with processing.
>
> Thank you in advance.
>
> Best regards,
> Daniela
>


Re: How will storm replay the tuple tree?

2016-09-13 Thread Ambud Sharma
Here is a post on it
https://bryantsai.com/fault-tolerant-message-processing-in-storm/.

Point to point tracking is expensive unless you are using transactions.
Flume does point to point transfers using transactions.

On Sep 13, 2016 3:27 PM, "Tech Id"  wrote:

> I agree with this statement about code/architecture but in case of some
> system outages, like one of the end-points (Solr, Couchbase, Elastic-Search
> etc.) being down temporarily, a very large number of other fully-functional
> and healthy systems will receive a large number of duplicate replays
> (especially in heavy throughput topologies).
>
> If you can elaborate a little more on the performance cost of tracking
> tuples or point to a document reflecting the same, that will be of great
> help.
>
> Best,
> T.I.
>
> On Tue, Sep 13, 2016 at 12:26 PM, Hart, James W.  wrote:
>
>> Failures should be very infrequent, if they are not then rethink the code
>> and architecture.  The performance cost of tracking tuples in the way that
>> would be required to replay at the failure is large, basically that method
>> would slow everything way down for very infrequent failures.
>>
>>
>>
>> *From:* S G [mailto:sg.online.em...@gmail.com]
>> *Sent:* Tuesday, September 13, 2016 3:17 PM
>> *To:* user@storm.apache.org
>> *Subject:* Re: How will storm replay the tuple tree?
>>
>>
>>
>> Hi,
>>
>>
>>
>> I am a little curious to know why we begin at the spout level for case 1.
>>
>> If we replay at the failing bolt's parent level (BoltA in this case),
>> then it should be more performant due to a decrease in duplicate processing
>> (as compared to whole tuple tree replays).
>>
>>
>>
>> If BoltA crashes due to some reason while replaying, only then the Spout
>> should receive this as a failure and whole tuple tree should be replayed.
>>
>>
>>
>> This saving in duplicate processing will be more visible with several
>> layers of bolts.
>>
>>
>>
>> I am sure there is a good reason to replay the whole tuple-tree, and want
>> to know the same.
>>
>>
>>
>> Thanks
>>
>> SG
>>
>>
>>
>> On Tue, Sep 13, 2016 at 10:22 AM, P. Taylor Goetz 
>> wrote:
>>
>> Hi Cheney,
>>
>>
>>
>> Replays happen at the spout level. So if there is a failure at any point
>> in the tuple tree (the tuple tree being the anchored emits, unanchored
>> emits don’t count), the original spout tuple will be replayed. So the
>> replayed tuple will traverse the topology again, including unanchored
>> points.
>>
>>
>>
>> If an unanchored tuple fails downstream, it will not trigger a replay.
>>
>>
>>
>> Hope this helps.
>>
>>
>>
>> -Taylor
>>
>>
>>
>>
>>
>> On Sep 13, 2016, at 4:42 AM, Cheney Chen  wrote:
>>
>>
>>
>> Hi there,
>>
>>
>>
>> We're using storm 1.0.1, and I'm checking through http://storm.apache.or
>> g/releases/1.0.1/Guaranteeing-message-processing.html
>>
>>
>>
>> Got questions for below two scenarios.
>>
>> Assume topology: S (spout) --> BoltA --> BoltB
>>
>> 1. S: anchored emit, BoltA: anchored emit
>>
>> Suppose BoltB processing failed w/ ack, what will the replay be, will it
>> execute both BoltA and BoltB or only failed BoltB processing?
>>
>>
>>
>> 2. S: anchored emit, BoltA: unanchored emit
>>
>> Suppose BoltB processing failed w/ ack, replay will not happen, correct?
>>
>>
>>
>> --
>>
>> Regards,
>> Qili Chen (Cheney)
>>
>> E-mail: tbcql1...@gmail.com
>> MP: (+1) 4086217503
>>
>>
>>
>>
>>
>
>


Re: Re: How will storm replay the tuple tree?

2016-09-13 Thread Ambud Sharma
No as per the code only individual messages are replayed.

On Sep 13, 2016 6:09 PM, "fanxi...@travelsky.com" 
wrote:

> Hi:
>
> I'd like to make clear on something about Kafka-spout referring to ack.
>
> For example, kafka-spout fetches offset 5000-6000 from Kafka server, but
> one tuple whose offset is 5101 is failed by a bolt, the whole batch of
> 5000-6000 will be remain in kafka-spout until the 5101 tuple will be acked.
> If the 5101 tuple can not be acked for a long time, the batch 5000-6000
> will remain for a long time, and the kafka-spout will stop to fetch data
> from kafka in these time.
>
> Am I right?
>
>
> --
> Josh
>
>
>
> *From:* Tech Id 
> *Date:* 2016-09-14 06:26
> *To:* user 
> *Subject:* Re: How will storm replay the tuple tree?
> I agree with this statement about code/architecture but in case of some
> system outages, like one of the end-points (Solr, Couchbase, Elastic-Search
> etc.) being down temporarily, a very large number of other fully-functional
> and healthy systems will receive a large number of duplicate replays
> (especially in heavy throughput topologies).
>
> If you can elaborate a little more on the performance cost of tracking
> tuples or point to a document reflecting the same, that will be of great
> help.
>
> Best,
> T.I.
>
> On Tue, Sep 13, 2016 at 12:26 PM, Hart, James W.  wrote:
>
>> Failures should be very infrequent, if they are not then rethink the code
>> and architecture.  The performance cost of tracking tuples in the way that
>> would be required to replay at the failure is large, basically that method
>> would slow everything way down for very infrequent failures.
>>
>>
>>
>> *From:* S G [mailto:sg.online.em...@gmail.com]
>> *Sent:* Tuesday, September 13, 2016 3:17 PM
>> *To:* user@storm.apache.org
>> *Subject:* Re: How will storm replay the tuple tree?
>>
>>
>>
>> Hi,
>>
>>
>>
>> I am a little curious to know why we begin at the spout level for case 1.
>>
>> If we replay at the failing bolt's parent level (BoltA in this case),
>> then it should be more performant due to a decrease in duplicate processing
>> (as compared to whole tuple tree replays).
>>
>>
>>
>> If BoltA crashes due to some reason while replaying, only then the Spout
>> should receive this as a failure and whole tuple tree should be replayed.
>>
>>
>>
>> This saving in duplicate processing will be more visible with several
>> layers of bolts.
>>
>>
>>
>> I am sure there is a good reason to replay the whole tuple-tree, and want
>> to know the same.
>>
>>
>>
>> Thanks
>>
>> SG
>>
>>
>>
>> On Tue, Sep 13, 2016 at 10:22 AM, P. Taylor Goetz 
>> wrote:
>>
>> Hi Cheney,
>>
>>
>>
>> Replays happen at the spout level. So if there is a failure at any point
>> in the tuple tree (the tuple tree being the anchored emits, unanchored
>> emits don’t count), the original spout tuple will be replayed. So the
>> replayed tuple will traverse the topology again, including unanchored
>> points.
>>
>>
>>
>> If an unanchored tuple fails downstream, it will not trigger a replay.
>>
>>
>>
>> Hope this helps.
>>
>>
>>
>> -Taylor
>>
>>
>>
>>
>>
>> On Sep 13, 2016, at 4:42 AM, Cheney Chen  wrote:
>>
>>
>>
>> Hi there,
>>
>>
>>
>> We're using storm 1.0.1, and I'm checking through http://storm.apache.or
>> g/releases/1.0.1/Guaranteeing-message-processing.html
>>
>>
>>
>> Got questions for below two scenarios.
>>
>> Assume topology: S (spout) --> BoltA --> BoltB
>>
>> 1. S: anchored emit, BoltA: anchored emit
>>
>> Suppose BoltB processing failed w/ ack, what will the replay be, will it
>> execute both BoltA and BoltB or only failed BoltB processing?
>>
>>
>>
>> 2. S: anchored emit, BoltA: unanchored emit
>>
>> Suppose BoltB processing failed w/ ack, replay will not happen, correct?
>>
>>
>>
>> --
>>
>> Regards,
>> Qili Chen (Cheney)
>>
>> E-mail: tbcql1...@gmail.com
>> MP: (+1) 4086217503
>>
>>
>>
>>
>>
>
>


RE: Running a long task in bolt prepare() method

2016-08-24 Thread Ambud Sharma
I would start by increasing task timeout

Config.*NIMBUS_TASK_LAUNCH_SECS*
and
Config.*NIMBUS_TASK_TIMEOUT_SECS*

would so supervisor doesn't mark the task as dead and restart.

On Aug 24, 2016 2:17 AM, "Simon Cooper" 
wrote:

> We’re decompressing and deserializing several hundreds-of-megabytes files
> containing data (statistical classifier definitions, mostly) that the bolt
> needs to do its thing. The bolt can’t process events without deserializing
> and indexing the data in those files, which could take anything up to
> several minutes. This can’t easily be farmed out to an external service,
> due to various processing and infrastructure limitations
>
>
>
> SimonC
>
>
>
> *From:* Hart, James W. [mailto:jwh...@seic.com]
> *Sent:* 23 August 2016 15:04
> *To:* user@storm.apache.org
> *Subject:* RE: Running a long task in bolt prepare() method
>
>
>
> Can you elaborate on what kind work is being done at startup?
>
>
>
> If you are building some kind of cacheable lookup data, I would build that
> elsewhere in a persistent cache, like redis, and then fetch and access it
> through redis.
>
>
>
> *From:* Simon Cooper [mailto:simon.coo...@featurespace.co.uk
> ]
> *Sent:* Tuesday, August 23, 2016 9:36 AM
> *To:* user@storm.apache.org
> *Subject:* RE: Running a long task in bolt prepare() method
>
>
>
> We’ve got a similar issue, where the prepare() takes a long time (could be
> up to several minutes), and the bolt can’t process tuples until that is
> completed. The topology seems to send in tuples before the prepare is
> completed, and things go wrong
>
>
>
> We’re having to implement our own mechanism for notification – an external
> way for the bolt to report to the spout that it is ready. This is also an
> issue on multi-worker topologies where one of the workers goes down, is
> recreated, and it’s several minutes before it can process tuples.
>
>
>
> It would be good if there was a way for storm to deal with this, so we
> don’t have to implement our own back-channel back to the spout…
>
>
>
> SimonC
>
>
>
> *From:* Andrea Gazzarini [mailto:gxs...@gmail.com ]
> *Sent:* 23 August 2016 13:08
> *To:* user@storm.apache.org
> *Subject:* Re: Running a long task in bolt prepare() method
>
>
>
> Not sure if there's a "built-in" approach in Storm for doint that. After
> make sure there isn't,  I'd do the following
>
>- I'd start such long task asynchronously in the prepare method and
>I'd register a callback
>- if the execute method logic depends on the completion of such task,
>I'd use a basic state pattern with two states ON/OFF (where the off state
>is basically a NullObject). The callback would be responsible to switch
>the bolt state from OFF (initial state) to ON (working state)
>
> Best,
> Andrea
>
> On 23/08/16 09:12, Xiang Wang wrote:
>
> Hi All,
>
>
>
> I am trying to do some long-time initialisation task in bolt prepare()
> method in local mode.
>
>
>
> I always got error like this:
>
> *WARN  o.a.s.s.o.a.z.s.p.FileTxnLog - fsync-ing the write ahead log in
> SyncThread:0 took 1197ms which will adversely effect operation latency. See
> the ZooKeeper troubleshooting guide*
>
>
>
> And then the task fails.
>
>
>
> Could anyone tell me how to fix this problem? Or is it a good practice to
> run long-time task in prepare() method? If not, what is supposed to be the
> correct way to do it?
>
>
>
> Many thanks for your kind help.
>
>
>
> Best,
>
> Xiang
>
> ---
>
> Xiang Wang, PhD Candidate
>
> Database Research Group
>
> School of Computer Science and Engineering
>
> The University of New South Wales
>
> SYDNEY, AUSTRALIA
>
>
>
> This message, and any files/attachments transmitted together with it, is
> intended for the use only of the person (or persons) to whom it is
> addressed. It may contain information which is confidential and/or
> protected by legal privilege. Accordingly, any dissemination, distribution,
> copying or use of this message, or any part of it or anything sent together
> with it, other than by intended recipients, may constitute a breach of
> civil or criminal law and is hereby prohibited. Unless otherwise stated,
> any views expressed in this message are those of the person sending it and
> not the sender's employer. No responsibility, legal or otherwise, of
> whatever nature, is accepted as to the accuracy of the contents of this
> message or for the completeness of the message as received. Anyone who is
> not the intended recipient of this message is advised to make no use of it
> and is requested to contact Featurespace Limited as soon as possible. Any
> recipient of this message who has knowledge or suspects that it may have
> been the subject of unauthorised interception or alteration is also
> requested to contact Featurespace Limited.
>


Re: Is there a way to tell if a tuple has timed out in 0.10

2016-08-22 Thread Ambud Sharma
Zack, that is the way of telling tuple timeout.

You infer it from the fact that there are no bolt failures however spout
has failures.

On Aug 22, 2016 1:38 PM, "Zachary Smith"  wrote:

> Hi all,
>
> I have a topology which occasionally has a lot of tuples that fail at the
> spout level, with all the bolts acking. I assume that they're timing out,
> and would like to know if there is any way to tell if a tuple is timing out
> via logging or some other means.
>
> Thanks!
> Zack
>


Fwd: Rule Based Approach for Real Time Stream Processing

2016-08-18 Thread Ambud Sharma
Have built and open sourced something similar.

https://github.com/symantec/hendrix

Please let me know if we could start building on top of this.

Thanks and regards,
Ambud