Re: ack in downstream when using all grouping method

2016-12-19 Thread Xunyun Liu
​
Yes, my processing logic is task id dependent. Thus the behavior of
different bolt instances are similar but not exactly the same. This is also
the reason why I want some instances to be non-critical that do not affect
the ack procedure.

I would like to explore the possibility of modifying the ack logic so that
tuples emitted to non-critical tasks are not anchored. I will report any
progress I have on this matter.

Best Regards.



On 20 December 2016 at 11:32, Ambud Sharma  wrote:

> Storm is a framework built on replays, fundamentally replays are the way
> guaranteed event processing is accomplished. Typically all Bolt Instances
> in a given registered bolt should be running the same code, unless you are
> doing some logic based on task ids. This implies that behavior of bolt
> instances should be similar as well unless experiencing a hardware failure.
>
> If I am understanding your use case you can either duplicate the data
> outside storm (like write it to separate kafka topics) and have independent
> spouts pick it up while keeping everything in 1 topology.
>
> Grouping however is applied to one stream, you can have more than one
> streams to have a logical separation as well.
>
> I am still unsure about why would you get partial failures unless it's
> frequent supervisor failure, may be you can provide more details about your
> use case.
>
> Lastly ALL groups are usually used for update delivery where
> acknowledgements should matter, however if you can get away with using
> unanchored tuples then that is also an alternative.
>
>
> On Dec 19, 2016 4:17 PM, "Xunyun Liu"  wrote:
>
> Thank you for your answer, Ambud. My use case is that only some of the
> bolt instances are critical that I need them responding to the signal
> through proper acknowledgment. However, the rest of them are non-critical
> which are preferably not to interfere the normal ack process, much like
> receiving an unanchored tuple. Is there any way that I can achieve this?
>
> On 20 December 2016 at 11:01, Ambud Sharma  wrote:
>
>> Forgot to answer your specific question. Storm message id is internal and
>> will be different so you will see a duplicate tuple with a different id.
>>
>> On Dec 19, 2016 3:59 PM, "Ambud Sharma"  wrote:
>>
>>> Yes that is correct. All downstream tuples must be processed for the
>>> root tuple to be acknowledged.
>>>
>>> Type of grouping does not change the acking behavior.
>>>
>>> On Dec 19, 2016 3:53 PM, "Xunyun Liu"  wrote:
>>>
 Hi there,

 As some grouping methods allow sending multiple copies of emitted data
 to downstream bolt instances, I was wondering what will happen if any one
 of them is not able to ack the tuple due to failures. The intrinsic
 question is that, when the all grouping method is used, whether the
 recipients are receiving the exact the same tuple or just duplications with
 different tuple IDs. In the latter case, I believe the tuple tree is
 expanded with regard to the number of parallelisms in downstream and each
 task has to invoke ack() for the root tuple to be fully processed.

 Any idea is much appreciated.


 --
 Best Regards.
 ==
 Xunyun Liu
 ​​


>
>
> --
> Best Regards.
> ==
> Xunyun Liu
> The Cloud Computing and Distributed Systems (CLOUDS) Laboratory,
> The University of Melbourne
>
>
>


-- 
Best Regards.
==
Xunyun Liu
The Cloud Computing and Distributed Systems (CLOUDS) Laboratory,
The University of Melbourne


Re: deploy bolts to a specific supervisor

2016-12-19 Thread Ambud Sharma
Storm workers are suppose to be identical for the most part. You can tune
things a little by setting odd number of executors compared to the worker
count.

To ideally accomplish what you are trying to do you can:
1. make these "cpu intensive" bolts register their task ids to zookeeper or
other KV store
2. query this information and use direct grouping to manually route the
message

You will still have bolt instances for "cpu intensive" workload on your t1s
 however they will be idle.

On Thu, Nov 24, 2016 at 1:56 PM, Ohad Edelstein  wrote:

> Hey,
>
> We are running a storm application using a single type on instance in AWS
> and a single topology to run our system.
>
> This is causing some resource limitation issues. The way we want to
> address this is by splitting our IO intense bolts into a cluster of a few
> dozens/hundred  t1.small machines (for example) and all our CPU intense
> bolts to a few large machines with lots of cpu & memory.
>
> Basically what i am asking is, is there a way to start all this
> supervisors and then deploy one topology that include cpu intense bolts on
> the big machines and to the small machines the deploy IO bolts?
>
> All documentation I see, is relevant to version 0.9.x
>
> Thanks!
>


Re: Support of topic wildcards in Kafka spout

2016-12-19 Thread Ambud Sharma
No, this is currently not supported. Please open a feature request:
https://issues.apache.org/jira/browse/STORM/ so we can vote on it from a
community perspective and see if others would be interested in developing
this feature.

On Tue, Nov 22, 2016 at 5:53 AM, Wijekoon, Manusha <
manusha.wijek...@citi.com> wrote:

> We have a topology that has multiple kafka spout tasks. Each spout task is
> supposed to read a subset of messages from a set of Kafka topics. Topics
> have to be subscribed using a wild card such as AAA.BBB.*. The expected
> behaviour would be that all spout tasks collectively will consume all
> messages in all of the topics that match the wild card. Each message is
> only routed to a single spout task (Ignore failure scenarios). Is this
> currently supported?
>


Re: Apache Storm 1.0.2 integration with ElasticSearch 5.0.0

2016-12-19 Thread Ambud Sharma
That is wire-level protocol incompatibility for ES or zen is disabled or
nodes are not reachable in Elasticsearch.

On Mon, Nov 28, 2016 at 7:21 PM, Zhechao Ma 
wrote:

> As far as I know, storm-elasticsearch still doesn't support elasticsearch
> 5.0. You can use *Elasticsearch-Hadoop *5.x instead, which provides class 
> *EsBolt
> *for writing data from storm to elasitcsearch.
>
> https://github.com/elastic/elasticsearch-hadoop
>
> 2016-11-29 10:48 GMT+08:00 Abhishek Samuel :
>
>> Hi,
>>
>> I was writing a bolt to write to elasticsearch 5.0.0 running locally
>> using the examples provided here http://storm.apache.org/r
>> eleases/1.0.2/storm-elasticsearch.html
>>
>> Am using the maven dependency
>>
>> 
>>org.apache.storm
>>storm-elasticsearch
>>1.0.2
>>
>>
>> The code compiled, however at runtime i was getting the below error.
>> Could you please advise on what the issue could be ?
>>
>> Thanks and Regards
>> Abhishek Samuel
>>
>> *On the elasticsearch server terminal*
>>
>> [2016-11-28T18:17:58,810][WARN ][o.e.t.n.Netty4Transport  ] [ZCyigtO]
>> exception caught on transport layer [[id: 0x2f3da3a0, L:/127.0.0.1:9300 -
>> R:/127.0.0.1:52299]], closing connection
>>
>> java.lang.IllegalStateException: Received message from unsupported
>> version: [1.0.0] minimal compatible version is: [5.0.0]
>>
>> at 
>> org.elasticsearch.transport.TcpTransport.messageReceived(TcpTransport.java:1199)
>> ~[elasticsearch-5.0.0.jar:5.0.0]
>>
>> at org.elasticsearch.transport.netty4.Netty4MessageChannelHandl
>> er.channelRead(Netty4MessageChannelHandler.java:74)
>> ~[transport-netty4-5.0.0.jar:5.0.0]
>>
>>
>> *On the java runtime terminal*
>>
>>
>> 8262 [Thread-14-friends-es-bolt-executor[2 2]] ERROR o.a.s.d.executor -
>>
>> org.elasticsearch.client.transport.NoNodeAvailableException: None of the
>> configured nodes are available: []
>>
>> at org.elasticsearch.client.transport.TransportClientNodesServi
>> ce.ensureNodesAreAvailable(TransportClientNodesService.java:305)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at org.elasticsearch.client.transport.TransportClientNodesServi
>> ce.execute(TransportClientNodesService.java:200)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at org.elasticsearch.client.transport.support.InternalTransport
>> Client.execute(InternalTransportClient.java:106)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.elasticsearch.client.support.AbstractClient.index(AbstractClient.java:102)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.elasticsearch.client.transport.TransportClient.index(TransportClient.java:340)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.elasticsearch.action.index.IndexRequestBuilder.doExecute(IndexRequestBuilder.java:266)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:91)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:65)
>> ~[elasticsearch-1.6.0.jar:?]
>>
>> at 
>> org.apache.storm.elasticsearch.bolt.EsIndexBolt.execute(EsIndexBolt.java:64)
>> [storm-elasticsearch-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.daemon.executor$fn__8058$tuple_action_fn__8060.invoke(executor.clj:731)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at 
>> org.apache.storm.daemon.executor$fn__8058$fn__8071$fn__8124.invoke(executor.clj:850)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
>> [storm-core-1.0.2.jar:1.0.2]
>>
>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>
>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
>>
>
>
>
> --
> Thanks
> Zhechao Ma
>


Re: Clarity on external/storm-kafka-client

2016-12-19 Thread Ambud Sharma
The Storm-External project has the Kafka Spouts and bolts; Storm doesn't
directly control the compatibility with Kafka, with that being said the
default version of Kafka integrations will work according to your list; so
the answer is Yes about the "compatibility"

However, it's still possible to use older version of Kafka with newer
version of Storm and vice-a-versa by either implementing your own Spouts /
Bolts or carefully working with selective dependency selection in your
topology.



On Wed, Nov 30, 2016 at 1:12 PM, Kristopher Kane 
wrote:

> I need some clarification on Kafka broker version support across Storm
> branches.
>
> storm-kafka-client uses the 'new' Kafka APIs
>
>
> The master branch README of storm-kafka-client says that it ONLY supports
> Kafka version .10 and later.
>
> The 1.x branch README of storm-kafka-client says that it ONLY supports
> Kafka version .10 and later.
>
> The 1.0.x branch README of storm-kafka-client says that it ONLY supports
> Kafka version .9 and later.
>
>
> Is it really true that 1.x and later only support Kafka version .10 and
> later or that it was tested on Kafka .10 but since it is the new API really
> still works on .9 as well  and the wording was never changed in the README?
>
> Thanks,
>
> Kris
>


Re: Does the bolt in between have the ability to re-emit a failed tuple?

2016-12-19 Thread Ambud Sharma
Replaying of tuples is done from the Spout and not done on a point to point
basis like Apache Flume.

Either a tuple is completely processed i.e. acked by every single bolt in
the pipeline or it's not; if it's not then it will be replayed by the Spout
(if the Spout implements a replay logic when the fail() method is called)

Acking has to be done at every stage of the pipeline.

Here's a post that will explain you the necessary details of the XoR
Ledger: https://bryantsai.com/fault-tolerant-message-processing-in-storm/

On Wed, Nov 30, 2016 at 9:10 PM, Navin Ipe 
wrote:

> Apart from the previous question, there's also the question of whether we
> should ack the tuple in execute() of Bolt1? Or is it just sufficient to ack
> it in Bolt2?
>
>
> On Wed, Nov 30, 2016 at 12:28 PM, Navin Ipe  com> wrote:
>
>> Hi,
>>
>> Just need a confirmation for this topology:
>>
>> *Spout* ---emit---> *Bolt1* ---emit---> *Bolt2*
>>
>> Spout is BaseRichSpout. Bolt1 and Bolt2 are BaseRichBolt.
>> Spout emits just one tuple per nextTuple() call.
>> Bolt1 anchors to the tuple it received from Spout and emits many
>> different tuple objects.
>>
>> If any of the emits of Bolt1 fails, is there no way for Bolt 1 to re-emit
>> the tuple? Do I have to wait for the topology to figure out that one of
>> Bolt1's tuples failed and then do a re-emit from the Spout?
>>
>> --
>> Regards,
>> Navin
>>
>
>
>
> --
> Regards,
> Navin
>


Re: Storm blobstore expiry

2016-12-19 Thread Ambud Sharma
What type of Blobstore is it?



On Thu, Dec 1, 2016 at 1:57 AM, Mostafa Gomaa  wrote:

> Hello All,
>
> I am using the storm blobstore to store some dynamic dictionaries, however
> I found that the blobstore had been cleared when I restarted the machine.
> Is there a way to make the blobstore non-volatile?
>
> Thanks
>


Re: help on Consuming the data from SSL enabled 0.9 kafka topic

2016-12-19 Thread Ambud Sharma
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_storm-user-guide/content/stormkafka-secure-config.html

On Mon, Dec 19, 2016 at 4:49 PM, Ambud Sharma 
wrote:

> Not sure if this helps:
>
> On Thu, Dec 1, 2016 at 4:11 AM, Srinivas.Veerabomma <
> srinivas.veerabo...@target.com> wrote:
>
>> Hi,
>>
>>
>>
>> I need some help. Basically looking for some sample Storm code or some
>> suggestions.
>>
>>
>>
>> My requirement is to develop a code in Apache Storm latest version to
>> consume the data from Kafka topic (Kafka version : “0.9.0.1”). This
>> kafka topic is SSL enabled and data is in Avro format. I was able to run
>> the command – “kafka-avro-console-consumer” at the command line and consume
>> the data.
>>
>>
>>
>> I also used the Kafka API code – “KafkaConsumer consumer = 
>> *new *KafkaConsumer(props);” and able to consume the data. 
>> In both the cases,  I provided the required details such as :
>>
>>
>>
>> *bootstrap.servers*
>>
>> *group.id *
>>
>> *client.id *
>>
>> *security.protocol*
>>
>> *ssl.protocol*
>>
>> *ssl.truststore.location*
>>
>> *ssl.truststore.password*
>>
>> *ssl.keystore.location*
>>
>> *ssl.keystore.password*
>>
>> *key.deserializer*
>>
>> *value.deserializer*
>>
>> *schema.registry.url*
>>
>>
>>
>>
>>
>> With the above two options (using Command Line and using Kafka API), I was 
>> able to connect to SSL enabled Kafka topic and read the Avro data and 
>> de-serialize it successfully. I want to achieve the same thing using Apache 
>> Storm code. I did some research and unable to find any sample code. Please 
>> let me know if you have any sample code or what Storm Class to use to 
>> achieve this. Thanks.
>>
>>
>>
>>
>>
>> Regards
>>
>> *Srini Veerabomma |  Big Data Business Intelligence, EDABI COE | **¤**Target
>> India|  GWS Center | Bangalore | Mobile : +91-94825-04323
>> <+91%2094825%2004323>*
>>
>>
>>
>
>


Re: help on Consuming the data from SSL enabled 0.9 kafka topic

2016-12-19 Thread Ambud Sharma
Not sure if this helps:

On Thu, Dec 1, 2016 at 4:11 AM, Srinivas.Veerabomma <
srinivas.veerabo...@target.com> wrote:

> Hi,
>
>
>
> I need some help. Basically looking for some sample Storm code or some
> suggestions.
>
>
>
> My requirement is to develop a code in Apache Storm latest version to
> consume the data from Kafka topic (Kafka version : “0.9.0.1”). This kafka
> topic is SSL enabled and data is in Avro format. I was able to run the
> command – “kafka-avro-console-consumer” at the command line and consume the
> data.
>
>
>
> I also used the Kafka API code – “KafkaConsumer consumer = 
> *new *KafkaConsumer(props);” and able to consume the data. In 
> both the cases,  I provided the required details such as :
>
>
>
> *bootstrap.servers*
>
> *group.id *
>
> *client.id *
>
> *security.protocol*
>
> *ssl.protocol*
>
> *ssl.truststore.location*
>
> *ssl.truststore.password*
>
> *ssl.keystore.location*
>
> *ssl.keystore.password*
>
> *key.deserializer*
>
> *value.deserializer*
>
> *schema.registry.url*
>
>
>
>
>
> With the above two options (using Command Line and using Kafka API), I was 
> able to connect to SSL enabled Kafka topic and read the Avro data and 
> de-serialize it successfully. I want to achieve the same thing using Apache 
> Storm code. I did some research and unable to find any sample code. Please 
> let me know if you have any sample code or what Storm Class to use to achieve 
> this. Thanks.
>
>
>
>
>
> Regards
>
> *Srini Veerabomma |  Big Data Business Intelligence, EDABI COE | **¤**Target
> India|  GWS Center | Bangalore | Mobile : +91-94825-04323
> <+91%2094825%2004323>*
>
>
>


Re: topology.debug always true

2016-12-19 Thread Ambud Sharma
Yes, your reasoning is correct. The topology is overriding the debug
configurations, Storm allows topology to override all of the topology
specific settings.

On Fri, Dec 2, 2016 at 3:41 AM, Mostafa Gomaa  wrote:

> I think nimbus configuration is what you have on your nimbus machine,
> "topology.debug" was set to false over there. Topology config are config
> values that start with the "topology." prefix in the yaml file. Your code
> was overriding the debug setting for the topology, so you were seeing two
> different values. I'm not too sure if my reasoning is correct though.
>
> On Dec 2, 2016 1:33 PM, "Ohad Edelstein"  wrote:
>
>> I am embarrassed to say that you are right… thanks!
>> But what is the difference between the two?
>> Thanks
>>
>> From: Mostafa Gomaa 
>> Reply-To: "user@storm.apache.org" 
>> Date: Friday, 2 December 2016 at 13:03
>> To: "user@storm.apache.org" 
>> Cc: "d...@storm.apache.org" 
>> Subject: Re: topology.debug always true
>>
>> Are you sure you're not overriding this configuration in code when
>> building the topology?
>>
>> On Dec 1, 2016 10:07 PM, "Ohad Edelstein"  wrote:
>>
>>> We see that the topology.debug set to two different values:
>>>
>>> And:
>>> We set the topology.debug to false ( we know its set that way to default
>>> but we thought that, this was the problem ).
>>> Is there a different meaning to the param in this two screens?
>>>
>>>
>>>


Re: Worker's Behavior With Heap Limit

2016-12-19 Thread Ambud Sharma
LRU caches are an effective memory management technique for Storm bolts if
lookup is what you are trying to do however if you are doing in memory
aggregations, I highly recommend sticking with standard Java maps and then
checkpoint state to an external data store (hbase, redis etc.)

Note:
* Storm aggregation MUST be time bound else you will be out of memory
* In Storm community we are trying to use Caffeine for Storm instead of
Guava.

On Thu, Dec 8, 2016 at 1:53 AM, Matt Lowe  wrote:

> Depending on what your memory needs to do you might find Guava useful:
> https://github.com/google/guava/wiki/CachesExplained
>
> Its a time/access based cache which will remove elements if they time out.
>
> Its all dependant on what your business logic is though.
>
>
> On 8 December 2016 at 10:48:17, Eranga Heshan (eranga@gmail.com)
> wrote:
>
> I think you are correct because I ran "free -m" command on the terminal
> and saw that free memory was rapidly decreasing. When it was near empty,
> the worker was killed. Then a new worker was started with a new PID.
>
> And yeah, there is a lot of data cached in memory waiting to be executed
> or waiting to be sent over the network. Probably that would be the issue.
>
> I have not heard of Guava caching technique. Instead of caching, I think
> of extending the memory of the server so I could extend the heap assignment
> to the worker.
>
> Thanks,
> Regards,
>
>
>
> Eranga Heshan
> *Undergraduate*
> Computer Science & Engineering
> University of Moratuwa
> Mobile:  +94 71 138 2686 <%2B94%2071%20552%202087>
> Email: era...@wso2.com 
> 
> 
> 
>
> On Thu, Dec 8, 2016 at 2:42 PM, Matt Lowe 
> wrote:
>
>> My guess is the following:
>>
>> 1. Memory limit is reached within worker JVM
>> 2. Worker process starts to throw OutOfMemoryErrors (though this could be
>> silent and not in the logs)
>> 3. JVM will terminate due to lack of memory
>> 4. Supervisor will start a new JVM with new process Id and start a worker
>>
>> This will happen over and over. As storm is a Highly Available service, a
>> new worker will be added if one should die.
>>
>> I assume you are caching a lot of data in your bolt to reach this limit?
>> Have you tried using Guava cache?
>>
>> //Matt
>>
>>
>> On 8 December 2016 at 09:19:43, Matt Lowe (matthewneill...@gmail.com)
>> wrote:
>>
>> Hello Eranga,
>>
>> Do you have the worker/supervisor logs when the worker dies?
>>
>> //Matt
>>
>>
>> On 8 December 2016 at 03:07:39, Eranga Heshan (eranga@gmail.com)
>> wrote:
>>
>> Hi all,
>>
>> In Storm, I configured the worker's maximum heap size to 1900MB. Then I
>> carefully monitored what happens when it reaches the limit.
>>
>> The worker is killed and a new worker is spawned with another process id.
>> Is my observation correct? If so why is that happening?
>>
>> Thanks,
>> Regards,
>>
>>
>> Eranga Heshan
>> *Undergraduate*
>> Computer Science & Engineering
>> University of Moratuwa
>> Mobile:  +94 71 138 2686 <%2B94%2071%20552%202087>
>> Email: era...@wso2.com 
>> 
>> 
>> 
>>
>>
>


Re: How to send multi tick tuple?

2016-12-19 Thread Ambud Sharma
Counting ticks with modulo operator is the ideal way to do it.

Here's an example for you:
https://github.com/Symantec/hendrix/blob/current/hendrix-alerts/src/main/java/io/symcpe/hendrix/alerts/SuppressionBolt.java#L136


Some slides explaining what's going on:
http://www.slideshare.net/HadoopSummit/in-flux-limiting-for-a-multitenant-logging-service


Hope this helps!

On Sun, Dec 18, 2016 at 7:15 PM, Chen Junfeng  wrote:

> Currently I use
>
> conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,1);
>
>
>
> to create timer task where it sends a timer tick tuple every one seconds
> as I set. .
>
>
>
> But now I need to create multiple timer tasks, for instance I need to be
> reminded every one second, ten seconds, one minutes on one topology. How
> should I do it?
>
>
>
> Thanks!
>
>
>
> Junfeng Chen
>
>
>
>
>


Re: ack in downstream when using all grouping method

2016-12-19 Thread Ambud Sharma
Storm is a framework built on replays, fundamentally replays are the way
guaranteed event processing is accomplished. Typically all Bolt Instances
in a given registered bolt should be running the same code, unless you are
doing some logic based on task ids. This implies that behavior of bolt
instances should be similar as well unless experiencing a hardware failure.

If I am understanding your use case you can either duplicate the data
outside storm (like write it to separate kafka topics) and have independent
spouts pick it up while keeping everything in 1 topology.

Grouping however is applied to one stream, you can have more than one
streams to have a logical separation as well.

I am still unsure about why would you get partial failures unless it's
frequent supervisor failure, may be you can provide more details about your
use case.

Lastly ALL groups are usually used for update delivery where
acknowledgements should matter, however if you can get away with using
unanchored tuples then that is also an alternative.


On Dec 19, 2016 4:17 PM, "Xunyun Liu"  wrote:

Thank you for your answer, Ambud. My use case is that only some of the bolt
instances are critical that I need them responding to the signal through
proper acknowledgment. However, the rest of them are non-critical which are
preferably not to interfere the normal ack process, much like receiving an
unanchored tuple. Is there any way that I can achieve this?

On 20 December 2016 at 11:01, Ambud Sharma  wrote:

> Forgot to answer your specific question. Storm message id is internal and
> will be different so you will see a duplicate tuple with a different id.
>
> On Dec 19, 2016 3:59 PM, "Ambud Sharma"  wrote:
>
>> Yes that is correct. All downstream tuples must be processed for the root
>> tuple to be acknowledged.
>>
>> Type of grouping does not change the acking behavior.
>>
>> On Dec 19, 2016 3:53 PM, "Xunyun Liu"  wrote:
>>
>>> Hi there,
>>>
>>> As some grouping methods allow sending multiple copies of emitted data
>>> to downstream bolt instances, I was wondering what will happen if any one
>>> of them is not able to ack the tuple due to failures. The intrinsic
>>> question is that, when the all grouping method is used, whether the
>>> recipients are receiving the exact the same tuple or just duplications with
>>> different tuple IDs. In the latter case, I believe the tuple tree is
>>> expanded with regard to the number of parallelisms in downstream and each
>>> task has to invoke ack() for the root tuple to be fully processed.
>>>
>>> Any idea is much appreciated.
>>>
>>>
>>> --
>>> Best Regards.
>>> ==
>>> Xunyun Liu
>>> ​​
>>>
>>>


-- 
Best Regards.
==
Xunyun Liu
The Cloud Computing and Distributed Systems (CLOUDS) Laboratory,
The University of Melbourne


Re: ack in downstream when using all grouping method

2016-12-19 Thread Xunyun Liu
Thank you for your answer, Ambud. My use case is that only some of the bolt
instances are critical that I need them responding to the signal through
proper acknowledgment. However, the rest of them are non-critical which are
preferably not to interfere the normal ack process, much like receiving an
unanchored tuple. Is there any way that I can achieve this?

On 20 December 2016 at 11:01, Ambud Sharma  wrote:

> Forgot to answer your specific question. Storm message id is internal and
> will be different so you will see a duplicate tuple with a different id.
>
> On Dec 19, 2016 3:59 PM, "Ambud Sharma"  wrote:
>
>> Yes that is correct. All downstream tuples must be processed for the root
>> tuple to be acknowledged.
>>
>> Type of grouping does not change the acking behavior.
>>
>> On Dec 19, 2016 3:53 PM, "Xunyun Liu"  wrote:
>>
>>> Hi there,
>>>
>>> As some grouping methods allow sending multiple copies of emitted data
>>> to downstream bolt instances, I was wondering what will happen if any one
>>> of them is not able to ack the tuple due to failures. The intrinsic
>>> question is that, when the all grouping method is used, whether the
>>> recipients are receiving the exact the same tuple or just duplications with
>>> different tuple IDs. In the latter case, I believe the tuple tree is
>>> expanded with regard to the number of parallelisms in downstream and each
>>> task has to invoke ack() for the root tuple to be fully processed.
>>>
>>> Any idea is much appreciated.
>>>
>>>
>>> --
>>> Best Regards.
>>> ==
>>> Xunyun Liu
>>> ​​
>>>
>>>


-- 
Best Regards.
==
Xunyun Liu
The Cloud Computing and Distributed Systems (CLOUDS) Laboratory,
The University of Melbourne


Re: ack in downstream when using all grouping method

2016-12-19 Thread Ambud Sharma
Forgot to answer your specific question. Storm message id is internal and
will be different so you will see a duplicate tuple with a different id.

On Dec 19, 2016 3:59 PM, "Ambud Sharma"  wrote:

> Yes that is correct. All downstream tuples must be processed for the root
> tuple to be acknowledged.
>
> Type of grouping does not change the acking behavior.
>
> On Dec 19, 2016 3:53 PM, "Xunyun Liu"  wrote:
>
>> Hi there,
>>
>> As some grouping methods allow sending multiple copies of emitted data to
>> downstream bolt instances, I was wondering what will happen if any one of
>> them is not able to ack the tuple due to failures. The intrinsic question
>> is that, when the all grouping method is used, whether the recipients are
>> receiving the exact the same tuple or just duplications with different
>> tuple IDs. In the latter case, I believe the tuple tree is expanded with
>> regard to the number of parallelisms in downstream and each task has to
>> invoke ack() for the root tuple to be fully processed.
>>
>> Any idea is much appreciated.
>>
>>
>> --
>> Best Regards.
>> ==
>> Xunyun Liu
>> ​​
>>
>>


Re: ack in downstream when using all grouping method

2016-12-19 Thread Ambud Sharma
Yes that is correct. All downstream tuples must be processed for the root
tuple to be acknowledged.

Type of grouping does not change the acking behavior.

On Dec 19, 2016 3:53 PM, "Xunyun Liu"  wrote:

> Hi there,
>
> As some grouping methods allow sending multiple copies of emitted data to
> downstream bolt instances, I was wondering what will happen if any one of
> them is not able to ack the tuple due to failures. The intrinsic question
> is that, when the all grouping method is used, whether the recipients are
> receiving the exact the same tuple or just duplications with different
> tuple IDs. In the latter case, I believe the tuple tree is expanded with
> regard to the number of parallelisms in downstream and each task has to
> invoke ack() for the root tuple to be fully processed.
>
> Any idea is much appreciated.
>
>
> --
> Best Regards.
> ==
> Xunyun Liu
> ​​
>
>


ack in downstream when using all grouping method

2016-12-19 Thread Xunyun Liu
Hi there,

As some grouping methods allow sending multiple copies of emitted data to
downstream bolt instances, I was wondering what will happen if any one of
them is not able to ack the tuple due to failures. The intrinsic question
is that, when the all grouping method is used, whether the recipients are
receiving the exact the same tuple or just duplications with different
tuple IDs. In the latter case, I believe the tuple tree is expanded with
regard to the number of parallelisms in downstream and each task has to
invoke ack() for the root tuple to be fully processed.

Any idea is much appreciated.


-- 
Best Regards.
==
Xunyun Liu
​​


Re: How to send multi tick tuple?

2016-12-19 Thread Hugo Da Cruz Louro
It is hard to tell without the whole context, but another viable option may be 
to have a same thread timer, similar to what we have in the storm-kafka-client 
KafkaSpout. Please take a look 
here.

Good luck.
Hugo

On Dec 18, 2016, at 10:06 PM, Chen Junfeng 
> wrote:

So far I count tick tuples to solve this problem. But I am wondering if there 
is a better way ?


发件人: Matt Lowe
发送时间: 2016年12月19日 14:00
收件人: user@storm.apache.org
主题: Re: How to send multi tick tuple?

Count the ticks?

Best Regards
Matthew Lowe

On 19 Dec 2016, at 04:15, Chen Junfeng 
> wrote:

Currently I use

conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,1);


to create timer task where it sends a timer tick tuple every one seconds as I 
set. .

But now I need to create multiple timer tasks, for instance I need to be 
reminded every one second, ten seconds, one minutes on one topology. How should 
I do it?

Thanks!

Junfeng Chen