[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r130010442
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java 
---
@@ -39,7 +38,7 @@ public KryoTupleDeserializer(final Map 
conf, final GeneralTopolo
 _kryoInput = new Input(1);
 }
 
-public Tuple deserialize(byte[] ser) {
+public TupleImpl deserialize(byte[] ser) {
--- End diff --

Thanks Satish for clarifying. :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r130010224
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/SimpleWindowPartitionCache.java 
---
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple implementation that evicts the largest un-pinned entry from 
the cache. This works well
+ * for caching window partitions since the access pattern is mostly 
sequential scans.
+ */
+public class SimpleWindowPartitionCache implements 
WindowPartitionCache {
--- End diff --

It is good to have unit tests, including with multiple threads.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r130009889
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java 
---
@@ -39,7 +38,7 @@ public KryoTupleDeserializer(final Map 
conf, final GeneralTopolo
 _kryoInput = new Input(1);
 }
 
-public Tuple deserialize(byte[] ser) {
+public TupleImpl deserialize(byte[] ser) {
--- End diff --

I guess this is made possible since JDK 1.5


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r130009804
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java 
---
@@ -39,7 +38,7 @@ public KryoTupleDeserializer(final Map 
conf, final GeneralTopolo
 _kryoInput = new Input(1);
 }
 
-public Tuple deserialize(byte[] ser) {
+public TupleImpl deserialize(byte[] ser) {
--- End diff --

Thanks! Good to know. Do you know which version of JDK make it possible? 
Was it possible from very first version of Java?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/storm/pull/2241
  
Agree with @HeartSaVioR. If possible lets break this down into multiple 
patches like (1) JCQ replacing disruptor (2) changing the threading model (3) 
micro optimizations and so on which makes it easy to review and benchmark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r130009128
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java 
---
@@ -39,7 +38,7 @@ public KryoTupleDeserializer(final Map 
conf, final GeneralTopolo
 _kryoInput = new Input(1);
 }
 
-public Tuple deserialize(byte[] ser) {
+public TupleImpl deserialize(byte[] ser) {
--- End diff --

It works because Java supports covariant return types. But I am not sure 
why is this changed since I don't see this being made use of to avoid the 
downcast in the patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2241
  
Just leaving a note to make my requirements clear (it is quite simple): 

- new system doesn't break anything it worked
  - if they're unavoidable it should be discussed from Storm community, in 
worst case we decide to disallow to break 
- we should provide default values for relevant variables which makes most 
of topologies happy
  - for this patch it should show higher throughput and lower latency 
compared to default of master branch
- (optionally) we may want to provide specific value for them which makes 
benchmark topologies (or user topologies which runs full speed all the time) 
happier

We also may want to guide how parameters work, and how to tune them, and 
starting values for several use cases so that users can tweak their own 
topologies and find good values for them. I guess we didn't do that before, but 
maybe great to have.

Please let me know when this patch achieves my requirements. I'd rather 
treat this as WIP and just wait for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2200: STORM-2616: Documentation for built in metrics

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2200
  
+1 Nice documentation!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2245: Restore issues missing from changelog since fb2446075de20...

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2245
  
As you may know, you can even just made a change if it was wrong. Now you 
also think it's hard to maintain CHANGELOG, let's see how the discussion goes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2245: Restore issues missing from changelog since fb2446075de20...

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2245
  
+1 You can merge this without waiting 24hr.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] Remove CHANGELOG file

2017-07-27 Thread Jungtaek Lim
correction: other projects -> *some* other projects, though they're popular
projects (including in competition)

2017년 7월 28일 (금) 오전 10:51, Jungtaek Lim 님이 작성:

> I'm happy that there're other guys having same difficult and sharing same
> feeling.
>
> This discussion has been initiating several times (from me) and getting
> some +1s for each thread but didn't reach to actual work.
>
> We already utilize JIRA, and I'm subscribing issues@ and taking care of
> issues forgot to mark resolve and/or labeling fixed versions.
> It may sounds ideal for us to let reporters caring about their issues, but
> committers can also help that, and in fact merger is in responsible to take
> care of resolving the issue, so irrelevant to contributor for this side.
>
> My other consideration is that which thing is convenient for release
> manager. Taylor took the release manager all the time (thanks for the great
> work!) and it is directly related to release announcement so would like to
> hear his opinion. If it is more convenient or he think he can tolerate
> that, we can just go on.
>
> Please note that other projects don't use merge commit. Most of the time
> they squash commits in PR into one, labeling commit title as JIRA issue,
> making commit list just as CHANGELOG. That's another thing we discussed
> earlier and I think we need to discuss again, but that can be discussed
> from another thread.
>
> Regarding maintaining contributors: easy to explain. Just take a look at
> what Spark has been doing. Some other projects follow the approach as well.
>
> We can run the script to extract authors of git commits, and just " | sort
> | uniq", and done. Pulling assigner from JIRA issue may be more accurate,
> since it requires actual account whereas author information in commit is
> not strictly required to identify them. We can apply hybrid approach as
> well, but for starter just following git commits looks OK to me.
>
> IMHO they don't feel proud strongly only they're listed in contributors.
> Looking at contribution graph works better in this case, given that it also
> shows commit count and lines of change. (regardless of accuracy)
> It may give more proud to mention them as release announce. It will lead
> contributors to play consistently, trying to participate and be mentioned
> for releases as many as possible. IMO Spark built a great strategy for this
> side, and if we all think it is great, why not follow?
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2017년 7월 28일 (금) 오전 6:58, Stig Rohde Døssing 님이
> 작성:
>
>> We already have to keep JIRA updated, and keeping JIRA consistent is
>> easier
>> since there isn't one view of the resolved issues for each git branch like
>> we have with CHANGELOG, so there's no worry about e.g. master having a
>> different opinion on solved issues in 1.2.0 than 1.x-branch has.
>>
>> I think we already have the guideline that only small (e.g. typo) changes
>> are okay without a JIRA issue. We're already encouraging one commit per
>> issue, most of the PRs I've seen recently have been squashing before
>> merge.
>> Is this not your experience?
>>
>> I think we have the contributors/committers lists on SVN as well for
>> generating http://storm.apache.org/contribute/People.html at
>> https://svn.apache.org/repos/asf/storm/site/_data/. I think Jungtaek was
>> suggesting keeping the committers list, and generating the contributors
>> list for each release by either commit authors or JIRA assignees, but he
>> can probably elaborate better.
>>
>> 2017-07-27 23:06 GMT+02:00 Hugo Da Cruz Louro :
>>
>> > I am +1 for discontinuing CHANGELOG. However, using JIRA to compile this
>> > info will only work if contributors are very disciplined and consistent
>> > updating JIRA. That leads to the question, is it any easier to maintain
>> > JIRA consistent then it is to keep CHANGELOG consistent? A clean and
>> > consistent JIRA is ideal, as it will also make it easy to create reports
>> > for individual components, etc.
>> >
>> > This discussion touches a proposal I suggested awhile ago, that Storm
>> > community should have a more strict and consistent Git log guideline. In
>> > short, besides very trivial changes, like typos, or one or two line
>> > changes, every feature or bug should be associated with a JIRA.
>> > Furthermore, one commit should correspond to one JIRA, and one JIRA
>> should
>> > be solved by one commit. That means, we should focus on assuring that
>> > commits are squashed, and their titles really reflect the issue they
>> > address, etc.
>> >
>> > Af for the contributors and committers list. If we remove those lists,
>> > where will this information be kept ?
>> >
>> > Hugo
>> >
>> > > On Jul 27, 2017, at 1:44 PM, Stig Rohde Døssing <
>> stigdoess...@gmail.com>
>> > wrote:
>> > >
>> > > Sorry to necro this thread, but I think it's worth bringing this
>> issue up
>> > > again. As Jungtaek mentioned a manual changelog is 

Re: [DISCUSS] Remove CHANGELOG file

2017-07-27 Thread Jungtaek Lim
I'm happy that there're other guys having same difficult and sharing same
feeling.

This discussion has been initiating several times (from me) and getting
some +1s for each thread but didn't reach to actual work.

We already utilize JIRA, and I'm subscribing issues@ and taking care of
issues forgot to mark resolve and/or labeling fixed versions.
It may sounds ideal for us to let reporters caring about their issues, but
committers can also help that, and in fact merger is in responsible to take
care of resolving the issue, so irrelevant to contributor for this side.

My other consideration is that which thing is convenient for release
manager. Taylor took the release manager all the time (thanks for the great
work!) and it is directly related to release announcement so would like to
hear his opinion. If it is more convenient or he think he can tolerate
that, we can just go on.

Please note that other projects don't use merge commit. Most of the time
they squash commits in PR into one, labeling commit title as JIRA issue,
making commit list just as CHANGELOG. That's another thing we discussed
earlier and I think we need to discuss again, but that can be discussed
from another thread.

Regarding maintaining contributors: easy to explain. Just take a look at
what Spark has been doing. Some other projects follow the approach as well.

We can run the script to extract authors of git commits, and just " | sort
| uniq", and done. Pulling assigner from JIRA issue may be more accurate,
since it requires actual account whereas author information in commit is
not strictly required to identify them. We can apply hybrid approach as
well, but for starter just following git commits looks OK to me.

IMHO they don't feel proud strongly only they're listed in contributors.
Looking at contribution graph works better in this case, given that it also
shows commit count and lines of change. (regardless of accuracy)
It may give more proud to mention them as release announce. It will lead
contributors to play consistently, trying to participate and be mentioned
for releases as many as possible. IMO Spark built a great strategy for this
side, and if we all think it is great, why not follow?

Thanks,
Jungtaek Lim (HeartSaVioR)

2017년 7월 28일 (금) 오전 6:58, Stig Rohde Døssing 님이 작성:

> We already have to keep JIRA updated, and keeping JIRA consistent is easier
> since there isn't one view of the resolved issues for each git branch like
> we have with CHANGELOG, so there's no worry about e.g. master having a
> different opinion on solved issues in 1.2.0 than 1.x-branch has.
>
> I think we already have the guideline that only small (e.g. typo) changes
> are okay without a JIRA issue. We're already encouraging one commit per
> issue, most of the PRs I've seen recently have been squashing before merge.
> Is this not your experience?
>
> I think we have the contributors/committers lists on SVN as well for
> generating http://storm.apache.org/contribute/People.html at
> https://svn.apache.org/repos/asf/storm/site/_data/. I think Jungtaek was
> suggesting keeping the committers list, and generating the contributors
> list for each release by either commit authors or JIRA assignees, but he
> can probably elaborate better.
>
> 2017-07-27 23:06 GMT+02:00 Hugo Da Cruz Louro :
>
> > I am +1 for discontinuing CHANGELOG. However, using JIRA to compile this
> > info will only work if contributors are very disciplined and consistent
> > updating JIRA. That leads to the question, is it any easier to maintain
> > JIRA consistent then it is to keep CHANGELOG consistent? A clean and
> > consistent JIRA is ideal, as it will also make it easy to create reports
> > for individual components, etc.
> >
> > This discussion touches a proposal I suggested awhile ago, that Storm
> > community should have a more strict and consistent Git log guideline. In
> > short, besides very trivial changes, like typos, or one or two line
> > changes, every feature or bug should be associated with a JIRA.
> > Furthermore, one commit should correspond to one JIRA, and one JIRA
> should
> > be solved by one commit. That means, we should focus on assuring that
> > commits are squashed, and their titles really reflect the issue they
> > address, etc.
> >
> > Af for the contributors and committers list. If we remove those lists,
> > where will this information be kept ?
> >
> > Hugo
> >
> > > On Jul 27, 2017, at 1:44 PM, Stig Rohde Døssing <
> stigdoess...@gmail.com>
> > wrote:
> > >
> > > Sorry to necro this thread, but I think it's worth bringing this issue
> up
> > > again. As Jungtaek mentioned a manual changelog is easy to break, and
> it
> > > looks like some issues are listed wrong on master and missing from 1.x
> (
> > > https://github.com/apache/storm/pull/2245)
> > >
> > > I think dropping CHANGELOG is a great idea, and we might use Kafka as
> an
> > > example for how to do release notes. They have JIRA generate the notes
> 

Re: Request for holding off commits to storm-client module

2017-07-27 Thread Jungtaek Lim
Now pull request for STORM-2306 is up, and it looks like requiring several
weeks (months?) to be verified.

IMHO the effort of rebasing is unavoidable (though we could put effort to
minimize), especially pull requests were up already. Unfortunately, that's
about who takes the load, and I don't want to ask a favor of rebasing to
authors waiting for weeks and even some months to get their PR merged in.

So at least for earlier pull requests before the patch, I don't find a
reason to block putting them into storm-client module right now. Opinions?

- Jungtaek Lim (HeartSaVioR)

2017년 7월 24일 (월) 오후 4:30, Roshan Naik 님이 작성:

> I am fully ok with rebasing. I don’t expect development on Storm to cease
> while the PR gets reviewed.  Its only changes to storm-client are likely to
> require to be done differently or not needed at all for the new system
> (like some fixes that went recently into Disruptor). So only for changes to
> storm-client module I am asking to see if the devs can work with me if its
> critical to get it in right away… only to avoid duplication of efforts or
> redundant efforts.
>
> Will have a PR out within the next 24 hours.
>
> -roshan
>
>
> On 7/23/17, 7:57 PM, "Jungtaek Lim"  wrote:
>
> Hi Roshan,
>
> I guess rebasing could be necessary even while in review phase, so
> personally I'd rather see the PR first even it has merge conflict
> between
> origin master branch, and have review process with changes in PR, and
> address once review process is done.
>
> If you want to hold off commits just before you submit a pull request,
> and
> you expect to submit it in several days, I'm also OK to wait for that.
> I
> just would like to ensure that holding doesn't stay longer.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2017년 7월 24일 (월) 오전 7:40, Roshan Naik 님이 작성:
>
> > Storm Devs,
> >   My PR for STORM-2306 (messaging subsystem redesign) is almost
> ready. Not
> > surprisingly, this PR brings extensive modifications in storm-client
> module
> > and touches lightly on some others.
> >
> > Just finished manually rebasing my local changes (to ~90 files) from
> an
> > old version of master on to the latest master this Fri… but  shortly
> after
> > I was done, more commits came into storm-client and need some manual
> > reconciling .. so it’s a bit difficult to keep up with.
> >
> > So, would like to request committers and contributors looking to make
> > changes in this critical module to either wait for this PR to go
> through or
> > work with me to see if we can work something out if its critical.
> >
> > Changes related to core areas like Disruptor, Metrics, Credential
> updates,
> > Worker, Executor, Task, Bolt/Spout output collectors/executors,
> ACKing etc
> > are areas with very high likelihood of merge conflicts.
> >
> > If there are better alternative ideas happy to consider.
> > Thanks for understanding,
> >
> > -roshan
> >
>
>
>


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2241
  
Btw, I think talking about current state is less meaningful. This patch has 
lots of TODO and some critical identified issues, so it should be addressed, 
and after that the number is going to be really different. We may argue same 
things again and again, so maybe better to hold before this patch becomes 
really ready to review (not WIP).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2241
  
I don't think utilizing metrics consumer in TVL is the issue: it might 
matter if results are close so that contributions of other system component 
does matter, but it is just not acceptable latency for low rate. Huge gap 
between twos.

Let's say we get rid of metrics consumer and that makes stable, then are we 
going to pursue users to not use metrics consumer? That doesn't make sense. 
While I don't think so, but if we think metrics consumer contributes throughput 
and/or latency in really odd way, it needs to be validated and fixed.

As you can see my result, CPU was over 100% even with rate 500 and total 
tasks of three key components were 12 (not 48, please keep in mind). All 
results for TVL was captured in that way. So this patch shows high CPU usage in 
baseline (say minimal load) and shows fluctuation by 80% all over rate 1, 
whereas master branch was 20%.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on the issue:

https://github.com/apache/storm/pull/2241
  
@revans2 

About that  "better than sliced bread" : 
how could i not be offended.. at least briefly  ;-) but you could buy me 
lunch if this PR turns out better than you were initially afraid of. It  was 
perhaps a very "low latency" -1 for any PR in the history of Storm. :-) 

Yes there are rough edges and some bugs... but I do dream of being able 
nail it all the way in one go. 

Your observation about the **very high latency for low throughput** topos. 
That is clearly a problem with batch not filling up and not getting flushed. 
The 5 sec latency corresponds to the 'topology.flush.tuple.freq.millis' setting 
(default 5sec). So at each step between Spout->Bolt and Bolt->Bolt if the its 
waiting for 5 sec then you are likely to see such ridiculous latency numbers.

Given that, I think the solution must be evident by now ...  but will state 
it here for the benefit of other readers to whom it may not be:

Tweak one or both of  these two settings:
- **topology.producer.batch.size** : for low throughput topos setting this 
to 1 is a good idea. In the new system the throughput penalty is not that for a 
small batch much compared to larger batch size.  
- **topology.flush.tuple.freq.millis**: You could reduce this for more 
frequent batch flushing. It causes the timer thread to take up more cpu... but 
if throughput fluctuates between very and very low over the day, then is 
setting maybe better to meet latency SLA.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread harshach
Github user harshach commented on the issue:

https://github.com/apache/storm/pull/2241
  
@revans2 I am trying to reproduce the worst-case in your last chart. 
Running TVL topology with 4 spout, 10 splitters, 4 counters, 2 ackers.  Here is 
the code
https://gist.github.com/harshach/73dae347c178ac5dd8651cb0e7902412
Running it via following command against Master and STORM-2306 
`/bin/storm jar /tmp/storm-starter-2.0.0-SNAPSHOT.jar 
org.apache.storm.starter.ThroughputVsLatency 500 1 -c topology.workers=1 -c 
topology.max.spout.pending=500 -c topology.acker.executors=2`

You can look at my results here 
https://docs.google.com/spreadsheets/d/1wPpC3YXp-vTIelRTUVoLxuxIYxUekiIHUpZ2ZEysC4Y/edit#gid=1239810430
 in **sheet 2**
What I see not much difference between Master and STORM-2306. Let me know 
if I am missing something in running this test.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] Remove CHANGELOG file

2017-07-27 Thread Stig Rohde Døssing
We already have to keep JIRA updated, and keeping JIRA consistent is easier
since there isn't one view of the resolved issues for each git branch like
we have with CHANGELOG, so there's no worry about e.g. master having a
different opinion on solved issues in 1.2.0 than 1.x-branch has.

I think we already have the guideline that only small (e.g. typo) changes
are okay without a JIRA issue. We're already encouraging one commit per
issue, most of the PRs I've seen recently have been squashing before merge.
Is this not your experience?

I think we have the contributors/committers lists on SVN as well for
generating http://storm.apache.org/contribute/People.html at
https://svn.apache.org/repos/asf/storm/site/_data/. I think Jungtaek was
suggesting keeping the committers list, and generating the contributors
list for each release by either commit authors or JIRA assignees, but he
can probably elaborate better.

2017-07-27 23:06 GMT+02:00 Hugo Da Cruz Louro :

> I am +1 for discontinuing CHANGELOG. However, using JIRA to compile this
> info will only work if contributors are very disciplined and consistent
> updating JIRA. That leads to the question, is it any easier to maintain
> JIRA consistent then it is to keep CHANGELOG consistent? A clean and
> consistent JIRA is ideal, as it will also make it easy to create reports
> for individual components, etc.
>
> This discussion touches a proposal I suggested awhile ago, that Storm
> community should have a more strict and consistent Git log guideline. In
> short, besides very trivial changes, like typos, or one or two line
> changes, every feature or bug should be associated with a JIRA.
> Furthermore, one commit should correspond to one JIRA, and one JIRA should
> be solved by one commit. That means, we should focus on assuring that
> commits are squashed, and their titles really reflect the issue they
> address, etc.
>
> Af for the contributors and committers list. If we remove those lists,
> where will this information be kept ?
>
> Hugo
>
> > On Jul 27, 2017, at 1:44 PM, Stig Rohde Døssing 
> wrote:
> >
> > Sorry to necro this thread, but I think it's worth bringing this issue up
> > again. As Jungtaek mentioned a manual changelog is easy to break, and it
> > looks like some issues are listed wrong on master and missing from 1.x (
> > https://github.com/apache/storm/pull/2245)
> >
> > I think dropping CHANGELOG is a great idea, and we might use Kafka as an
> > example for how to do release notes. They have JIRA generate the notes
> and
> > put them up alongside each release, for instance
> > https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html. It's
> a
> > much nicer overview of changes than what we have in CHANGELOG where the
> > issue types are mixed, and a manual changelog is easier to get out of
> sync
> > with JIRA. We could probably configure JIRA to generate something similar
> > with a template (guide:
> > https://confluence.atlassian.com/adminjiraserver073/
> creating-release-notes-861253367.html
> > ).
> >
> > 2017-03-10 8:31 GMT+01:00 Jungtaek Lim :
> >
> >> This propose will make merge process simpler, so I guess committers/PMCs
> >> might not have strong opinions about this. I'm concerning mainly about
> how
> >> it will affect release process.
> >>
> >> Taylor, I guess you're busy, but could you give your opinion about this
> as
> >> a release manager? Would removing CHANGELOG hurt or break release
> process?
> >>
> >> And do we need to vote to make progress?
> >>
> >> Thanks,
> >> Jungtaek Lim (HeartSaVioR)
> >>
> >> 2017년 3월 10일 (금) 오후 3:08, Xin Wang 님이 작성:
> >>
> >>> LGTM. I give a +1 for the idea.
> >>>
> >>> 2017-03-07 9:29 GMT+08:00 Jungtaek Lim :
> >>>
>  Bump. I think it's not that trivial for code merger and release
> >> manager,
>  and even contributors (how to represent their contributions.)
> 
>  2017년 2월 24일 (금) 오전 9:43, Roshan Naik 님이 작성:
> 
> > Sounds like a good idea to me.
> > -roshan
> >
> > On 2/23/17, 4:41 PM, "Jungtaek Lim"  wrote:
> >
> >Hi devs,
> >
> >I guess we discussed about this before, but didn't move to actual
>  work.
> >
> >I'd like to propose again removing CHANGELOG file in repository,
> >>> and
> > use
> >JIRA issue's fixed version(s).
> >
> >Maintaining CHANGELOG to file is really easy to break. I've seen
> > several
> >times and most of them is about backport. CHANGELOG file between
> > branches
> >are inconsistent.
> >
> >Suppose we would like to backport the issue to 1.0.x which is
> >> only
> > applied
> >to 2.0.0, then we should fix CHANGELOG from three branches. Easy
> >> to
> > miss
> >and redundant.
> >
> >I'd also like to remove Project leads / Committers / Contributors

[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread harshach
Github user harshach commented on the issue:

https://github.com/apache/storm/pull/2241
  
@revans2 I am trying to reproduce the worst-case in your last chart. 
Running TVL topology with 4 spout, 10 splitters, 4 counters, 2 ackers.  Here is 
the code
https://gist.github.com/harshach/73dae347c178ac5dd8651cb0e7902412
Running it via following command against Master and STORM-2306 
`/bin/storm jar /tmp/storm-starter-2.0.0-SNAPSHOT.jar 
org.apache.storm.starter.ThroughputVsLatency 500 1 -c topology.workers=1 -c 
topology.max.spout.pending=500 -c topology.acker.executors=2`

You can look at my results here 
https://docs.google.com/spreadsheets/d/1wPpC3YXp-vTIelRTUVoLxuxIYxUekiIHUpZ2ZEysC4Y/edit#gid=1239810430
 in **sheet 2**
What I see not much difference between Master and STORM-2306. Let me know 
if I am missing something in running this test.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on the issue:

https://github.com/apache/storm/pull/2241
  
Some points covering prev comments by @HeartSaVioR and @revans2 

**Throughput limiting:** That only makes sense if you are measuring 
Throughput vs CPU/other resource usage.  Latency measurements do not need it. 
And its a sin if you are doing that when trying to measure throughput.

**TVL topology:** 
- Given its rate limiting nature, it definitely does not have the right 
name. Its employment of very high threads counts and rate limiting spouts 
appear to be tuned to work within the limitations of the current msging system 
and target the old sweetspot. Deserves a question.  Harsha's measurements 
(which are more sensible in terms of executor counts), shows that the current 
msging was brought down to its knees very quickly once the rate limiting went 
away.  


@revans2 
The drop you are seeing with the increased in splitter counts is indicative 
of the increased CPU contention going on even when not enough data flowing 
through an executor (the issue you initially brought up... of high CPU usage 
for idle topos).  The old system, executor seems to be spending more time 
sleeping when there is insufficient data flow and less CPU contention and 
adding redundant/idle executors is not affecting it as much.So you can 
throughput plateaus. 

Lowering the CPU contention for idle mode is something i plan to address... 
and i think have left some TODOs for myself in the code already for to keep me 
honest.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #476: STORM-720: Storm.cmd should return ERRORLEVEL befor...

2017-07-27 Thread rtandonmsft
Github user rtandonmsft closed the pull request at:

https://github.com/apache/storm/pull/476


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] Remove CHANGELOG file

2017-07-27 Thread Hugo Da Cruz Louro
I am +1 for discontinuing CHANGELOG. However, using JIRA to compile this info 
will only work if contributors are very disciplined and consistent updating 
JIRA. That leads to the question, is it any easier to maintain JIRA consistent 
then it is to keep CHANGELOG consistent? A clean and consistent JIRA is ideal, 
as it will also make it easy to create reports for individual components, etc.

This discussion touches a proposal I suggested awhile ago, that Storm community 
should have a more strict and consistent Git log guideline. In short, besides 
very trivial changes, like typos, or one or two line changes, every feature or 
bug should be associated with a JIRA. Furthermore, one commit should correspond 
to one JIRA, and one JIRA should be solved by one commit. That means, we should 
focus on assuring that commits are squashed, and their titles really reflect 
the issue they address, etc.

Af for the contributors and committers list. If we remove those lists, where 
will this information be kept ?

Hugo

> On Jul 27, 2017, at 1:44 PM, Stig Rohde Døssing  
> wrote:
> 
> Sorry to necro this thread, but I think it's worth bringing this issue up
> again. As Jungtaek mentioned a manual changelog is easy to break, and it
> looks like some issues are listed wrong on master and missing from 1.x (
> https://github.com/apache/storm/pull/2245)
> 
> I think dropping CHANGELOG is a great idea, and we might use Kafka as an
> example for how to do release notes. They have JIRA generate the notes and
> put them up alongside each release, for instance
> https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html. It's a
> much nicer overview of changes than what we have in CHANGELOG where the
> issue types are mixed, and a manual changelog is easier to get out of sync
> with JIRA. We could probably configure JIRA to generate something similar
> with a template (guide:
> https://confluence.atlassian.com/adminjiraserver073/creating-release-notes-861253367.html
> ).
> 
> 2017-03-10 8:31 GMT+01:00 Jungtaek Lim :
> 
>> This propose will make merge process simpler, so I guess committers/PMCs
>> might not have strong opinions about this. I'm concerning mainly about how
>> it will affect release process.
>> 
>> Taylor, I guess you're busy, but could you give your opinion about this as
>> a release manager? Would removing CHANGELOG hurt or break release process?
>> 
>> And do we need to vote to make progress?
>> 
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>> 
>> 2017년 3월 10일 (금) 오후 3:08, Xin Wang 님이 작성:
>> 
>>> LGTM. I give a +1 for the idea.
>>> 
>>> 2017-03-07 9:29 GMT+08:00 Jungtaek Lim :
>>> 
 Bump. I think it's not that trivial for code merger and release
>> manager,
 and even contributors (how to represent their contributions.)
 
 2017년 2월 24일 (금) 오전 9:43, Roshan Naik 님이 작성:
 
> Sounds like a good idea to me.
> -roshan
> 
> On 2/23/17, 4:41 PM, "Jungtaek Lim"  wrote:
> 
>Hi devs,
> 
>I guess we discussed about this before, but didn't move to actual
 work.
> 
>I'd like to propose again removing CHANGELOG file in repository,
>>> and
> use
>JIRA issue's fixed version(s).
> 
>Maintaining CHANGELOG to file is really easy to break. I've seen
> several
>times and most of them is about backport. CHANGELOG file between
> branches
>are inconsistent.
> 
>Suppose we would like to backport the issue to 1.0.x which is
>> only
> applied
>to 2.0.0, then we should fix CHANGELOG from three branches. Easy
>> to
> miss
>and redundant.
> 
>I'd also like to remove Project leads / Committers / Contributors
>>> in
> README
>(at least Contributors) since it's also easy to break.
> 
>For PMC members we're maintaining it to website and I think
>> that's
> enough.
>For contributors I love what other projects are doing: extract
>>> unique
>contributors name from commits or JIRA issues of release version
>>> and
>mention them from release announce note.
> 
>What do you think?
> 
>Thanks,
>Jungtaek Lim (HeartSaVioR)
> 
> 
> 
 
>>> 
>> 



Re: [DISCUSS] Remove CHANGELOG file

2017-07-27 Thread Stig Rohde Døssing
Sorry to necro this thread, but I think it's worth bringing this issue up
again. As Jungtaek mentioned a manual changelog is easy to break, and it
looks like some issues are listed wrong on master and missing from 1.x (
https://github.com/apache/storm/pull/2245)

I think dropping CHANGELOG is a great idea, and we might use Kafka as an
example for how to do release notes. They have JIRA generate the notes and
put them up alongside each release, for instance
https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html. It's a
much nicer overview of changes than what we have in CHANGELOG where the
issue types are mixed, and a manual changelog is easier to get out of sync
with JIRA. We could probably configure JIRA to generate something similar
with a template (guide:
https://confluence.atlassian.com/adminjiraserver073/creating-release-notes-861253367.html
).

2017-03-10 8:31 GMT+01:00 Jungtaek Lim :

> This propose will make merge process simpler, so I guess committers/PMCs
> might not have strong opinions about this. I'm concerning mainly about how
> it will affect release process.
>
> Taylor, I guess you're busy, but could you give your opinion about this as
> a release manager? Would removing CHANGELOG hurt or break release process?
>
> And do we need to vote to make progress?
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2017년 3월 10일 (금) 오후 3:08, Xin Wang 님이 작성:
>
> > LGTM. I give a +1 for the idea.
> >
> > 2017-03-07 9:29 GMT+08:00 Jungtaek Lim :
> >
> > > Bump. I think it's not that trivial for code merger and release
> manager,
> > > and even contributors (how to represent their contributions.)
> > >
> > > 2017년 2월 24일 (금) 오전 9:43, Roshan Naik 님이 작성:
> > >
> > > > Sounds like a good idea to me.
> > > > -roshan
> > > >
> > > > On 2/23/17, 4:41 PM, "Jungtaek Lim"  wrote:
> > > >
> > > > Hi devs,
> > > >
> > > > I guess we discussed about this before, but didn't move to actual
> > > work.
> > > >
> > > > I'd like to propose again removing CHANGELOG file in repository,
> > and
> > > > use
> > > > JIRA issue's fixed version(s).
> > > >
> > > > Maintaining CHANGELOG to file is really easy to break. I've seen
> > > > several
> > > > times and most of them is about backport. CHANGELOG file between
> > > > branches
> > > > are inconsistent.
> > > >
> > > > Suppose we would like to backport the issue to 1.0.x which is
> only
> > > > applied
> > > > to 2.0.0, then we should fix CHANGELOG from three branches. Easy
> to
> > > > miss
> > > > and redundant.
> > > >
> > > > I'd also like to remove Project leads / Committers / Contributors
> > in
> > > > README
> > > > (at least Contributors) since it's also easy to break.
> > > >
> > > > For PMC members we're maintaining it to website and I think
> that's
> > > > enough.
> > > > For contributors I love what other projects are doing: extract
> > unique
> > > > contributors name from commits or JIRA issues of release version
> > and
> > > > mention them from release announce note.
> > > >
> > > > What do you think?
> > > >
> > > > Thanks,
> > > > Jungtaek Lim (HeartSaVioR)
> > > >
> > > >
> > > >
> > >
> >
>


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on the issue:

https://github.com/apache/storm/pull/2241
  
For the new messaging system.. the scaling rule of thumb I have found so 
far is quite simple. 

For fast topos (and CPU intensive topos) ... 1 executor thread per 
*physical core*.  It applies to ACKer executors as well. Avoid trying to max 
out on logical cores / hyperthreads.

You are likely to be close to getting the most out of your hardware with 
that rule. You can start with that and try adding/removing one or more 
executors to see if you can squeeze more.  

The older system will typically need more executors per machine to get 
similar numbers (throughput usage).. but  throughput may not come close to 
the new system.

The rule for executors count v/s CPU cores for the existing msgs system 
seems less simple to me.

Trying to run 51 executors on a 4 core machine will surely be a step 
towards "de-scaling", if there is such a word. It is strange that such high 
executor count was useful in the current system.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2245: Restore issues missing from changelog since fb2446...

2017-07-27 Thread srdo
GitHub user srdo opened a pull request:

https://github.com/apache/storm/pull/2245

Restore issues missing from changelog since fb2446075de202387658b0cc8…

…434ee53ded5d35a

See 
https://github.com/apache/storm/commit/fb2446075de202387658b0cc8434ee53ded5d35a

I've grepped the log for the added issue numbers on 1.1.x-branch and the 
associated commits appear to be in 1.x but not 1.1.x, so they likely belong in 
1.2.0. On master these issues are listed in 2.0.0, which is maybe also wrong(?)

The two removed issues are duplicate, 2432 isn't fixed in 1.0.0 based on 
the 1.0.0 tag, and 2343 is listed twice in 1.1.1

ping @HeartSaVioR 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srdo/storm restore-changelog-1.x

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2245.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2245


commit 4ca12ff5c806cfc3991cc9e8ff6cf5a7715b37ce
Author: Stig Rohde Døssing 
Date:   2017-07-27T19:50:10Z

Restore issues missing from changelog since 
fb2446075de202387658b0cc8434ee53ded5d35a




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r129946825
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -548,22 +518,14 @@
 public static final String 
TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
 
 /**
- * The size of the Disruptor receive queue for each executor. Must be 
a power of 2.
+ * The size of the receive queue for each executor.
  */
-@isPowerOf2
 public static final String 
TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size";
--- End diff --

indeed! my bad.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2241
  
I have another chart now showing a comparison between master and this 
branch, just varying the number of splitter bolts in the topology.  There are 2 
ackers, 4 spouts, and 4 count bolts all within a single worker and with a max 
spout pending set to 500.  All of the configs are the defaults, and it is on my 
laptop like before.


![chart](https://user-images.githubusercontent.com/3441321/28689893-de51856c-72dc-11e7-8f5b-8f2a77efdfd3.png)

The scary thing here is that with these changes there is a tiny window 
where you get "good" throughput (lets say above 150k sentences per second) for 
this branch.  The previous branch has a very very wide window.  The thing that 
concerns me the most with the way it is now, is that there will be a lot of 
people who didn't turn the parallelism as low as possible, because it just 
works, and they will all have their topologies go from 180k/sec down to 
50k/sec.  And tuning them all perfectly to balance on that exact parallelism 
for the given heterogeneous hardware that we run on is going to be impossible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r129945279
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java 
---
@@ -21,101 +21,100 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.messaging.IConnection;
 import org.apache.storm.messaging.TaskMessage;
-import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TransferDrainer {
 
-  private Map> bundles = new 
HashMap();
+  private Map bundles = new HashMap();
+
   private static final Logger LOG = 
LoggerFactory.getLogger(TransferDrainer.class);
-  
-  public void add(HashMap 
taskTupleSetMap) {
-for (Map.Entry entry : 
taskTupleSetMap.entrySet()) {
-  addListRefToMap(this.bundles, entry.getKey(), entry.getValue());
+
+  // Cache the msgs grouped by destination node
+  public void add(TaskMessage taskMsg) {
+int destId = taskMsg.task();
+ArrayList msgs = bundles.get(destId);
--- End diff --

thanks for that sweet tip! just to be sure, i would feel comfortable 
verifying with profiler first ... given this is in critical path.   



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r129938970
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -122,28 +130,33 @@ public Task(Executor executor, Integer taskId) throws 
IOException {
 return new ArrayList<>(0);
 }
 
+
 public List getOutgoingTasks(String stream, List 
values) {
 if (debug) {
 LOG.info("Emitting Tuple: taskId={} componentId={} stream={} 
values={}", taskId, componentId, stream, values);
 }
 
-List outTasks = new ArrayList<>();
-if (!streamComponentToGrouper.containsKey(stream)) {
-throw new IllegalArgumentException("Unknown stream ID: " + 
stream);
-}
-if (null != streamComponentToGrouper.get(stream)) {
-// null value for __system
-for (LoadAwareCustomStreamGrouping grouper : 
streamComponentToGrouper.get(stream).values()) {
+ArrayList outTasks = new ArrayList<>();
+
+// TODO: PERF: expensive hashtable lookup in critical path
--- End diff --

Pretty much all these todos are intended to get addressed in this PR 
itself. Just wanted to call them out during this review. There are maybe 2 perf 
todo observations that I am hoping to leave in alongside the offending code ..  
as it took a lot of time to identify them (but not easy to fix here). So 
valuable to have them in IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r129937665
  
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java 
---
@@ -137,7 +137,7 @@ public void prepare(WorkerTopologyContext context, 
GlobalStreamId stream, List targetTasks;
+private ArrayList targetTasks;
--- End diff --

There is a minor diff.. In particular here I am critically counting on the 
stronger guarantees of const indexing time that is not provided by List<>.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r129936657
  
--- Diff: storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java 
---
@@ -30,4 +31,5 @@
 void ack(Tuple input);
 void fail(Tuple input);
 void resetTimeout(Tuple input);
+void flush();
--- End diff --

thanks for that sweet tip! just to be sure, i would feel comfortable 
verifying with profiler first ... given this is in critical path.   

ok Satish let me see about that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r129934123
  
--- Diff: conf/defaults.yaml ---
@@ -304,6 +303,7 @@ storm.cgroup.resources:
 storm.cgroup.hierarchy.name: "storm"
 storm.supervisor.cgroup.rootdir: "storm"
 storm.cgroup.cgexec.cmd: "/bin/cgexec"
+storm.cgroup.cgexec.cmd: "/bin/cgexec"
--- End diff --

thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r129930495
  
--- Diff: conf/defaults.yaml ---
@@ -231,16 +228,13 @@ topology.multilang.serializer: 
"org.apache.storm.multilang.JsonSerializer"
 topology.shellbolt.max.pending: 100
 topology.skip.missing.kryo.registrations: false
 topology.max.task.parallelism: null
-topology.max.spout.pending: null
+topology.max.spout.pending: null  # TODO: We dont need this any more
--- End diff --

the critical code path is now lock free so should be no room for 
live/dead-locks. To be very precise, its not exactly a blocking queue.. as 
there is a retry loop that we can break out of anytime. There are some changes 
coming soon in this PR.. wrt to what happens when Q is full. will also add that 
info in the design doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r129931296
  
--- Diff: conf/defaults.yaml ---
@@ -231,16 +228,13 @@ topology.multilang.serializer: 
"org.apache.storm.multilang.JsonSerializer"
 topology.shellbolt.max.pending: 100
 topology.skip.missing.kryo.registrations: false
 topology.max.task.parallelism: null
-topology.max.spout.pending: null
+topology.max.spout.pending: null  # TODO: We dont need this any more
 topology.state.synchronization.timeout.secs: 60
-topology.stats.sample.rate: 0.05
+topology.stats.sample.rate: 0.001
--- End diff --

Ok I personally feel that sampling can be reduced a bit, but if you have a 
preference towards retaining it  i am ok with that.  JCQueue is at the core 
messaging system .. not separate. the numbers demonstrate the improvement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r129934167
  
--- Diff: 
examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
 ---
@@ -43,7 +43,7 @@ public Fields getComponentOutputFields(String 
componentId, String streamId) {
 return new Fields("source", "index", "type", "id");
 }
 };
-return new TupleImpl(topologyContext, new Values(source, index, 
type, id), 1, "");
+return new TupleImpl(topologyContext, new Values(source, index, 
type, id), "testSrc", 1, "");
--- End diff --

fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2241#discussion_r129934940
  
--- Diff: 
examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java
 ---
@@ -76,6 +76,8 @@ public static StormTopology getTopology(Map conf) {
 public static void main(String[] args) throws Exception {
 int runTime = -1;
 Config topoConf = new Config();
+topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8);
--- End diff --

Higher numbers for that setting are only useful for very low latency 
spouts. The default setting there assumes that you are likely to be using 
Kafka/Hdfs kind of spouts which have higher latencies (amount of time spent 
inside nextTuple() ).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[RESULT] [VOTE] Release Apache Storm 1.0.4 (rc1)

2017-07-27 Thread P. Taylor Goetz
This vote is now closed and passes with X binding +1 votes and no 0 or -1 votes.

Vote tally (* indicates binding votes):

+1:
Stig Rhode Døssing*
Bobby Evans*
Kishor Patil*
Jungtaek Lim*
P. Taylor Goetz*

I will release the staged artifacts and announce the release after the 24 hour 
waiting period.

-Taylor


> On Jul 24, 2017, at 2:24 PM, P. Taylor Goetz  wrote:
> 
> This is a call to vote on releasing Apache Storm 1.0.4 (rc1)
> 
> Full list of changes in this release:
> 
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGELOG.md;h=50878fab679973a1230466920006dc0746ffddd5;hb=eac433b0beb3798c4723deb39b3c4fad446378f4
> 
> The tag/commit to be voted upon is v1.0.4:
> 
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGELOG.md;hb=a5e1c154b5b2ae74fd78bf10d4c130afb1ad4513
> 
> The source archive being voted upon can be found here:
> 
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.4-rc1/apache-storm-1.0.4-src.tar.gz
> 
> Other release files, signatures and digests can be found here:
> 
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.4-rc1/
> 
> The release artifacts are signed with the following key:
> 
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd
> 
> The Nexus staging repository for this release is:
> 
> https://repository.apache.org/content/repositories/orgapachestorm-1048
> 
> Please vote on releasing this package as Apache Storm 1.0.4.
> 
> When voting, please list the actions taken to verify the release.
> 
> This vote will be open for at least 72 hours.
> 
> [ ] +1 Release this package as Apache Storm 1.0.4
> [ ]  0 No opinion
> [ ] -1 Do not release this package because...
> 
> Thanks to everyone who contributed to this release.
> 
> -Taylor



Re: [VOTE] Release Apache Storm 1.0.4 (rc1)

2017-07-27 Thread P. Taylor Goetz
+1 (binding)

-Taylor

> On Jul 24, 2017, at 2:24 PM, P. Taylor Goetz  wrote:
> 
> This is a call to vote on releasing Apache Storm 1.0.4 (rc1)
> 
> Full list of changes in this release:
> 
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGELOG.md;h=50878fab679973a1230466920006dc0746ffddd5;hb=eac433b0beb3798c4723deb39b3c4fad446378f4
> 
> The tag/commit to be voted upon is v1.0.4:
> 
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGELOG.md;hb=a5e1c154b5b2ae74fd78bf10d4c130afb1ad4513
> 
> The source archive being voted upon can be found here:
> 
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.4-rc1/apache-storm-1.0.4-src.tar.gz
> 
> Other release files, signatures and digests can be found here:
> 
> https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.4-rc1/
> 
> The release artifacts are signed with the following key:
> 
> https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd
> 
> The Nexus staging repository for this release is:
> 
> https://repository.apache.org/content/repositories/orgapachestorm-1048
> 
> Please vote on releasing this package as Apache Storm 1.0.4.
> 
> When voting, please list the actions taken to verify the release.
> 
> This vote will be open for at least 72 hours.
> 
> [ ] +1 Release this package as Apache Storm 1.0.4
> [ ]  0 No opinion
> [ ] -1 Do not release this package because...
> 
> Thanks to everyone who contributed to this release.
> 
> -Taylor



[VOTE] Release Apache Storm 1.1.1 (rc2)

2017-07-27 Thread P. Taylor Goetz
This is a call to vote on releasing Apache Storm 1.1.1 (rc2)

Full list of changes in this release:

https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGELOG.md;hb=41bfea87b1a002565333bd18a06d766af1ca3816

The tag/commit to be voted upon is v1.1.1:

https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=948ce7d63a31fae8c478785985d0ef79808e234e;hb=41bfea87b1a002565333bd18a06d766af1ca3816

The source archive being voted upon can be found here:

https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.1.1-rc2/apache-storm-1.1.1-src.tar.gz

Other release files, signatures and digests can be found here:

https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.1.1-rc2/

The release artifacts are signed with the following key:

https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd

The Nexus staging repository for this release is:

https://repository.apache.org/content/repositories/orgapachestorm-1050

Please vote on releasing this package as Apache Storm 1.1.1.

When voting, please list the actions taken to verify the release.

This vote will be open for at least 72 hours.

[ ] +1 Release this package as Apache Storm 1.1.1
[ ]  0 No opinion
[ ] -1 Do not release this package because...

Thanks to everyone who contributed to this release.

-Taylor

[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2241
  
I have run some more tests looking at modifying the parallelism of 
different components.

First I kept the parallelism of everything else at 4 and modified the acker 
count.

![chart_ackers](https://user-images.githubusercontent.com/3441321/28684646-bc460734-72ca-11e7-9434-8bdf2c263cab.png)

I also kept the ackers at 2 spout and count at 4 and modified the splitter 
count

![chart_splitters](https://user-images.githubusercontent.com/3441321/28684647-bc462f5c-72ca-11e7-91f8-0a4e1c748682.png)

The acker drop off at 5 is really scary,  but adding too many splitters 
also shows a lot of problems.  I am going to try something similar without the 
patch for comparison.

Overall the numbers look really good in some situations, but it is really 
easy to slip into much worse territory.  @knusbaum said that he was able to get 
a multi-worker setup to work, so that is something else I want to explore. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2241
  
Sorry, but we also need to think about low throughput use cases.  I have 
several that I care about and I am seeing very long latency for low throughput. 
 

min in some cases is 5 seconds, max can be up to 20 seconds, average is 
around 10 seconds and the CPU utilization is 500%.  This too needs to be 
addressed.

```
500 1 -c topology.workers=1
uptime:   30 acked: 4,000 acked/sec: 133.33 failed:0 99%:   
9,923,723,263 99.9%:   9,999,220,735 min:  79,036,416 max:  10,015,997,951 
mean: 5,861,829,371.65 stddev: 2,744,502,279.38 user:  0 sys:  
0 gc:  0 mem:   0.00
uptime:   60 acked:15,000 acked/sec: 500.00 failed:0 99%:  
14,646,509,567 99.9%:  14,973,665,279 min:  53,084,160 max:  15,023,996,927 
mean: 7,410,713,531.31 stddev: 3,187,842,885.35 user:  0 sys:  
0 gc:  0 mem:   0.00
uptime:   90 acked:16,000 acked/sec: 533.33 failed:0 99%:  
14,747,172,863 99.9%:  14,990,442,495 min:  37,486,592 max:  15,032,385,535 
mean: 7,947,532,282.45 stddev: 3,104,232,967.22 user:  0 sys:  
0 gc:  0 mem:   0.00
uptime:  120 acked:14,000 acked/sec: 466.67 failed:0 99%:  
14,856,224,767 99.9%:  14,998,831,103 min:  65,208,320 max:  15,023,996,927 
mean: 9,071,752,875.48 stddev: 3,337,053,852.19 user:  0 sys:  
0 gc:  0 mem:   0.00
uptime:  150 acked:13,000 acked/sec: 433.33 failed:0 99%:  
14,914,945,023 99.9%:  14,998,831,103 min:   4,999,610,368 max:  15,074,328,575 
mean: 10,374,946,814.88 stddev: 2,794,778,136.42 user:  0 sys:  
0 gc:  0 mem:   0.00
uptime:  180 acked:16,000 acked/sec: 533.33 failed:0 99%:  
14,940,110,847 99.9%:  15,049,162,751 min:   5,007,998,976 max:  15,602,810,879 
mean: 10,539,964,609.74 stddev: 2,796,155,497.39 user:  0 sys:  
0 gc:  0 mem:   0.00
uptime:  210 acked:15,000 acked/sec: 500.00 failed:0 99%:  
14,881,390,591 99.9%:  14,998,831,103 min:   5,003,804,672 max:  15,015,608,319 
mean: 9,616,077,147.72 stddev: 2,781,415,317.06 user:  0 sys:  
0 gc:  0 mem:   0.00
uptime:  240 acked:10,000 acked/sec: 333.33 failed:0 99%:  
14,889,779,199 99.9%:  15,007,219,711 min:   5,003,804,672 max:  15,015,608,319 
mean: 9,840,073,724.86 stddev: 2,806,028,726.32 user:  0 sys:  
0 gc:  0 mem:   0.00
uptime:  270 acked:16,000 acked/sec: 533.33 failed:0 99%:  
17,951,621,119 99.9%:  19,780,337,663 min:   5,003,804,672 max:  20,015,218,687 
mean: 10,556,609,171.18 stddev: 3,010,780,308.43 user:  0 sys:  
0 gc:  0 mem:   0.00
uptime:  300 acked:15,000 acked/sec: 500.00 failed:0 99%:  
14,898,167,807 99.9%:  14,998,831,103 min:  51,445,760 max:  15,023,996,927 
mean: 9,694,508,448.06 stddev: 3,087,190,409.09 user:  0 sys:  
0 gc:  0 mem:   0.00
```

I am fine with the goals and the design work being done for this.   If you 
can do better then the stuff I did for disruptor by all means rip out my code 
and make things better.  The low throughput issue was one I had to fix with my 
initial patches to disruptor.  People do care about this.  I am not trying to 
be a jerk, I am just trying to keep my customers happy, share some of my 
experience doing something similar in the past, and also hopefully make storm 
much much better in the end.

I apologize if  I offended anyone.  It was not my intention, but I really 
was shocked to see a patch everyone was touting as better than sliced bread 
decidedly worse in every way for a topology that worked really well before.  I 
was able to max out the default configuration of a parallelism of 4 at 100,000 
sentences per second fully acked.  The new patch could only handle 1/3rd of 
that, and not when there is more then 1 worker.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2241
  
@harshach 

Reiterating what @HeartSaVioR said about benchmarking.  Most benchmarking 
is done where you push a system to its limits and see what maximum throughput 
it can do.  This is far from what a real user wants.  It looks good for a 
vendor to brag about I can do X but that other vendor over there can only do Y. 
 But it is close to worthless for what real users want to know.

Real users are trying to balance the cost of the system in $ (CPU time + 
memory used become this, how many EC whatever instances do I need), the amount 
of data that they can push through the system and how quickly they can get 
results back.  Each of these variables are reflected by this test.  In most 
cases a user has a set load that they know they get typically, and a reasonable 
guess at a maximum load that they expect to see.  Also most users have a 
deadline by which the data is no good any more, if not they should be using 
batch.  And a budget that they have to spend on this project, if not call me I 
want to work for you and my salary requirements are very reasonable.

You need to give users tools to explore all three, and because the three 
are intertwined you want to be able to hold one or two of the variables 
constant while you measure the others.  Storm currently has no way to set a 
target SLA (I hope to add one eventually), but you can control the rate at 
which messages arrive and the parallelism of the topology, (which reflects the 
cost).  So the goal is to scan through various throughput values and various 
parallelisms to see what the latency is, and what resources are actually used.  
In the read world we would adjust the heap size and parallelism accordingly.

Complaining about a benchmark creating 51 threads relates to the 
parallelism that we want to explore.  If that is what I did wrong in the 
benchmark I am happy to adjust and reevaluate.  I want to understand how the 
parallelism impacts this code.  The thing that concerns me now is that it 
appears that scaling a topology is very different now, and I want to understand 
exactly how that works.

I cannot easily roll out a change to my customers saying things might get a 
lot better or they might get a lot worse.  We need to make it easy for a user 
with a topology that may not have been ideal (but worked well), to continue to 
work well.






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2241
  
I have updated all the results for TVL with second parameter set to 1. Also 
added rate 5.
The CPU usage from current master doesn't fluctuate from all of rates, even 
5, whereas with this patch the CPU usage sometimes fluctuate around 100%.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2241
  
Let me share a quick test result with passing `1 1` to TVL parameter:

> STORM-2306

```
uptime:   30 acked:   144,000 acked/sec:   4,800.00 failed:0 99%:   
3,070,230,527 99.9%:   3,221,225,471 min:  63,897,600 max:   3,380,609,023 
mean: 1,299,365,069.36 stddev:  685,287,508.16 user:  0 sys:  0 
gc:  0 mem:   0.00
uptime:   60 acked:   303,000 acked/sec:  10,100.00 failed:0 99%:   
3,011,510,271 99.9%:   3,200,253,951 min:  28,540,928 max:   3,303,014,399 
mean: 1,283,728,691.41 stddev:  671,791,145.42 user:  0 sys:  0 
gc:  0 mem:   0.00
uptime:   90 acked:   297,000 acked/sec:   9,900.00 failed:0 99%:   
3,047,161,855 99.9%:   3,307,208,703 min:  62,980,096 max:   3,737,124,863 
mean: 1,283,141,447.64 stddev:  675,126,086.16 user:  0 sys:  0 
gc:  0 mem:   0.00
uptime:  120 acked:   303,000 acked/sec:  10,100.00 failed:0 99%:   
3,047,161,855 99.9%:   3,206,545,407 min:  31,965,184 max:   3,347,054,591 
mean: 1,284,140,763.79 stddev:  690,625,730.54 user:  0 sys:  0 
gc:  0 mem:   0.00
uptime:  150 acked:   299,000 acked/sec:   9,966.67 failed:0 99%:   
3,072,327,679 99.9%:   3,231,711,231 min:  16,703,488 max:   3,414,163,455 
mean: 1,320,620,493.23 stddev:  693,327,734.87 user:  0 sys:  0 
gc:  0 mem:   0.00
uptime:  180 acked:   300,000 acked/sec:  10,000.00 failed:0 99%:   
3,042,967,551 99.9%:   3,248,488,447 min:  48,005,120 max:   3,846,176,767 
mean: 1,313,068,274.86 stddev:  671,810,427.83 user:  0 sys:  0 
gc:  0 mem:   0.00
uptime:  210 acked:   301,000 acked/sec:  10,033.33 failed:0 99%:   
3,061,841,919 99.9%:   3,363,831,807 min:  51,347,456 max:   3,802,136,575 
mean: 1,297,807,219.57 stddev:  678,980,965.35 user:  0 sys:  0 
gc:  0 mem:   0.00
uptime:  240 acked:   301,000 acked/sec:  10,033.33 failed:0 99%:   
3,019,898,879 99.9%:   3,208,642,559 min:  36,962,304 max:   3,363,831,807 
mean: 1,315,037,518.24 stddev:  676,620,121.79 user:  0 sys:  0 
gc:  0 mem:   0.00
uptime:  270 acked:   297,000 acked/sec:   9,900.00 failed:0 99%:   
3,026,190,335 99.9%:   3,200,253,951 min:  52,363,264 max:   3,349,151,743 
mean: 1,308,161,023.51 stddev:  680,121,348.29 user:  0 sys:  0 
gc:  0 mem:   0.00
uptime:  300 acked:   300,000 acked/sec:  10,000.00 failed:0 99%:   
3,021,996,031 99.9%:   3,200,253,951 min:  49,348,608 max:   3,317,694,463 
mean: 1,335,928,012.31 stddev:  667,642,145.32 user:  0 sys:  0 
gc:  0 mem:   0.00
```

CPU usage was around 150 ~ 250%, mostly around 160% which seemed to be a 
bit more stable, but still fluctuating with small rate.

> current

```
uptime:   30 acked:   140,440 acked/sec:   4,681.33 failed:0 99%:   
   14,016,511 99.9%:  26,558,463 min:   2,449,408 max:  52,035,583 
mean:7,800,556.68 stddev:1,790,982.79 user: 28,620 sys:  2,340 
gc:  0 mem:  49.94
uptime:   60 acked:   301,860 acked/sec:  10,062.00 failed:0 99%:   
   11,141,119 99.9%:  15,351,807 min:   3,233,792 max:  26,181,631 
mean:7,479,081.72 stddev:1,175,253.40 user: 30,270 sys:  6,800 
gc:190 mem:  54.88
uptime:   90 acked:   301,600 acked/sec:  10,053.33 failed:0 99%:   
   10,813,439 99.9%:  13,197,311 min:   3,246,080 max:  16,138,239 
mean:7,375,841.06 stddev:1,112,541.35 user: 31,660 sys:  7,160 
gc:194 mem:  54.68
uptime:  120 acked:   301,460 acked/sec:  10,048.67 failed:0 99%:   
   11,042,815 99.9%:  13,828,095 min:   3,266,560 max:  17,285,119 
mean:7,400,672.94 stddev:1,130,409.32 user: 29,650 sys:  7,330 
gc:200 mem:  47.80
uptime:  150 acked:   301,500 acked/sec:  10,050.00 failed:0 99%:   
   10,911,743 99.9%:  13,246,463 min:   3,248,128 max:  15,654,911 
mean:7,399,041.82 stddev:1,118,368.85 user: 29,920 sys:  7,370 
gc:199 mem:  41.95
uptime:  180 acked:   301,540 acked/sec:  10,051.33 failed:0 99%:   
   10,969,087 99.9%:  13,598,719 min:   3,233,792 max:  16,302,079 
mean:7,390,435.62 stddev:1,129,976.16 user: 29,840 sys:  7,190 
gc:201 mem:  41.16
uptime:  210 acked:   301,540 acked/sec:  10,051.33 failed:0 99%:   
   11,182,079 99.9%:  14,557,183 min:   3,246,080 max:  19,513,343 
mean:7,382,121.55 stddev:1,161,863.92 user: 29,620 sys:  7,460 
gc:   

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129809356
  
--- Diff: docs/Windowing.md ---
@@ -266,3 +266,108 @@ tuples can be received within the timeout period.
 An example toplogy `SlidingWindowTopology` shows how to use the apis to 
compute a sliding window sum and a tumbling window 
 average.
 
+## Stateful windowing
+The default windowing implementation in storm stores the tuples in memory 
until they are processed and expired from the 
+window. This limits the use cases to windows that
+fit entirely in memory. Also the source tuples cannot be ack-ed until the 
window expiry requiring large message timeouts
+(topology.message.timeout.secs should be larger than the window length + 
sliding interval). This also puts extra loads 
+due to the complex acking and anchoring requirements.
+ 
+To address the above limitations and to support larger window sizes, storm 
provides stateful windowing support via `IStatefulWindowedBolt`. 
+User bolts should typically extend `BaseStatefulWindowedBolt` for the 
windowing operations with the framework automatically 
+managing the state of the window in the background.
+
+If the sources provide a monotonically increasing identifier as a part of 
the message, the framework can use this
+to periodically checkpoint the last expired and evaluated message ids, to 
avoid duplicate window evaluations in case of 
+failures or restarts. During recovery, the tuples with message ids lower 
than last expired id are discarded and tuples with 
+message id between the last expired and last evaluated message ids are fed 
into the system without activating any previously
+activated windows.
+The tuples beyond the last evaluated message ids are processed as usual. 
This can be enabled by setting
+the `messageIdField` as shown below,
+
+```java
+topologyBuilder.setBolt("mybolt",
+   new MyStatefulWindowedBolt()
+   .withWindow(...) // windowing configuarations
+   .withMessageIdField("msgid"), // a monotonically 
increasing 'long' field in the tuple
+   parallelism)
+   .shuffleGrouping("spout");
+```
+
+However, this option is feasible only if the sources can provide a 
monotonically increasing identifier in the tuple and the same is maintained
+while re-emitting the messages in case of failures. With this option the 
tuples are still buffered in memory until processed
+and expired from the window.
+
+For more details take a look at the sample topology in storm-starter 
[StatefulWindowingTopology](../examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java)
 which will help you get started.
+
+### Window checkpointing
+
+With window checkpointing, the monotonically increasing id is no longer 
required since the framework transparently saves the state of the window 
periodically into the configured state backend.
+The state that is saved includes the tuples in the window, any system 
state that is required to recover the state of processing
+and also the user state.
+
+```java
+topologyBuilder.setBolt("mybolt",
+   new MyStatefulPersistentWindowedBolt()
+   .withWindow(...) // windowing configuarations
+   .withPersistence() // persist the window state
+   .withMaxEventsInMemory(25000), // max number of events 
to be cached in memory
+parallelism)
+   .shuffleGrouping("spout");
+
+```
+
+The `withPersistence` instructs the framework to transparently save the 
tuples in window along with
+any associated system and user state to the state backend. The 
`withMaxEventsInMemory` is an optional 
+configuration that specifies the maximum number of tuples that may be kept 
in memory. The tuples are transparently loaded from 
+the state backend as required and the ones that are most likely to be used 
again are retained in memory.
+
+The state backend can be configured by setting the topology state provider 
config,
+
+```java
+// use redis for state persistence
+conf.put(Config.TOPOLOGY_STATE_PROVIDER, 
"org.apache.storm.redis.state.RedisKeyValueStateProvider");
+
+```
+Currently storm supports Redis and HBase as state backends and uses the 
underlying state-checkpointing
+framework for saving the window state. For more details on state 
checkpointing see [State-checkpointing.md](State-checkpointing.md)
+
+Here is an example of a persistent windowed bolt that uses the window 
checkpointing to save its state. The `initState`
+is invoked with the last saved state (user state) at initialization time. 
The execute method is invoked based on 

[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2241
  
@harshach The second argument is effectively representing worker count: you 
can see that topology set worker count as parallelism. I agree that the name is 
really misleading even I ran tests with topology.workers instead of passing 
second argument. (need to run test again...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread harshach
Github user harshach commented on the issue:

https://github.com/apache/storm/pull/2241
  
@HeartSaVioR Its not 12 executors per worker.  If you don't pass a 
command-line argument, it sets parallelism variable here to 4 
https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java#L277
and multiplys with 4 here again 
https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java#L359
 . So setting a parallelism unit 16 per component. 
This is nothing to do with how many workers you've.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r128993342
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java
 ---
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import org.apache.storm.Config;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.state.StateFactory;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.DefaultEvictionContext;
+import org.apache.storm.windowing.Event;
+import org.apache.storm.windowing.EventImpl;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
+import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
+import static 
org.apache.storm.topology.WindowPartitionCache.RemovalListener;
+
+/**
+ * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses 
state and the underlying
+ * checkpointing mechanisms to save the tuples in window to state. The 
tuples are also kept in-memory
+ * by transparently caching the window partitions and checkpointing them 
as needed.
+ */
+public class PersistentWindowedBoltExecutor extends 
WindowedBoltExecutor implements IStatefulBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
+private final IStatefulWindowedBolt statefulWindowedBolt;
+private transient TopologyContext topologyContext;
--- End diff --

nit: may want to remove this instance as it is never used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129003027
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java
 ---
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import org.apache.storm.Config;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.state.StateFactory;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.DefaultEvictionContext;
+import org.apache.storm.windowing.Event;
+import org.apache.storm.windowing.EventImpl;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
+import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
+import static 
org.apache.storm.topology.WindowPartitionCache.RemovalListener;
+
+/**
+ * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses 
state and the underlying
+ * checkpointing mechanisms to save the tuples in window to state. The 
tuples are also kept in-memory
+ * by transparently caching the window partitions and checkpointing them 
as needed.
+ */
+public class PersistentWindowedBoltExecutor extends 
WindowedBoltExecutor implements IStatefulBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
+private final IStatefulWindowedBolt statefulWindowedBolt;
+private transient TopologyContext topologyContext;
+private transient OutputCollector outputCollector;
+private transient WindowState state;
+private transient boolean stateInitialized;
+private transient boolean prePrepared;
+
+public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) {
+super(bolt);
+statefulWindowedBolt = bolt;
+}
+
+@Override
+public void prepare(Map topoConf, TopologyContext 
context, OutputCollector collector) {
+List registrations = (List) 
topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
+registrations.add(ConcurrentLinkedQueue.class.getName());
+registrations.add(LinkedList.class.getName());
+registrations.add(AtomicInteger.class.getName());
+registrations.add(EventImpl.class.getName());
+registrations.add(WindowPartition.class.getName());
+registrations.add(DefaultEvictionContext.class.getName());
+topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
+prepare(topoConf, context, collector,
+getWindowState(topoConf, context),
+getPartitionState(topoConf, context),
+getWindowSystemState(topoConf, context));
+}
+
+// package access for unit tests
+void prepare(Map topoConf, TopologyContext context, 
OutputCollector collector,
+ KeyValueState 

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129791075
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java
 ---
@@ -0,0 +1,596 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import org.apache.storm.Config;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.state.StateFactory;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.DefaultEvictionContext;
+import org.apache.storm.windowing.Event;
+import org.apache.storm.windowing.EventImpl;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
+import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
+import static 
org.apache.storm.topology.WindowPartitionCache.RemovalListener;
+
+/**
+ * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses 
state and the underlying
+ * checkpointing mechanisms to save the tuples in window to state. The 
tuples are also kept in-memory
+ * by transparently caching the window partitions and checkpointing them 
as needed.
+ */
+public class PersistentWindowedBoltExecutor extends 
WindowedBoltExecutor implements IStatefulBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
+private final IStatefulWindowedBolt statefulWindowedBolt;
+private transient TopologyContext topologyContext;
+private transient OutputCollector outputCollector;
+private transient WindowState state;
+private transient boolean stateInitialized;
+private transient boolean prePrepared;
+
+public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) {
+super(bolt);
+statefulWindowedBolt = bolt;
+}
+
+@Override
+public void prepare(Map topoConf, TopologyContext 
context, OutputCollector collector) {
+List registrations = (List) 
topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
+registrations.add(ConcurrentLinkedQueue.class.getName());
+registrations.add(LinkedList.class.getName());
+registrations.add(AtomicInteger.class.getName());
+registrations.add(EventImpl.class.getName());
+registrations.add(WindowPartition.class.getName());
+registrations.add(DefaultEvictionContext.class.getName());
+topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
+prepare(topoConf, context, collector,
+getWindowState(topoConf, context),
+getPartitionState(topoConf, context),
+getWindowSystemState(topoConf, context));
+}
+
+@Override
+protected void validate(Map topoConf,
+

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r128991661
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java
 ---
@@ -151,6 +158,38 @@
 return this;
 }
 
+/**
+ * If set, the stateful windowed bolt would use the backend state for 
window persistence and
+ * only keep a sub-set of events in memory as specified by {@link 
#withMaxEventsInMemory(long)}.
+ */
+public BaseStatefulWindowedBolt withPersistence() {
+persistent = true;
+return this;
+}
+
+/**
+ * The maximum number of window events to keep in memory. This is 
meaningful only if
+ * {@link #withPersistence()} is also set. As the number of events in 
memory grows close
+ * to the maximum, the events that are less likely to be used again 
are evicted and persisted.
+ * The default value for this is {@code 1,000,000}.
+ *
+ * @param maxEventsInMemory the maximum number of window events to 
keep in memory
+ */
+public BaseStatefulWindowedBolt withMaxEventsInMemory(long 
maxEventsInMemory) {
--- End diff --

We can explore later limiting with memory size as that would give better 
handle to the user on memory resources, which can be with onheap/offheap cache. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129803794
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/WindowPartitionCache.java ---
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * A loading cache abstraction for caching {@link 
PersistentWindowedBoltExecutor.WindowPartition}.
+ *
+ * @param  the key type
+ * @param  the value type
+ */
+public interface WindowPartitionCache {
+
+/**
+ * Get value from the cache or load the value.
+ *
+ * @param key the key
+ * @return the value
+ */
+V get(K key);
+
+/**
+ * Get value from the cache or load the value pinning it
+ * so that the entry will never get evicted.
+ *
+ * @param key the key
+ * @return the value
+ */
+V getPinned(K key);
--- End diff --

This method name can be changed to `pinAndGet` as `getPinned` was implies 
getting the value if it is pinned.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129760482
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java
 ---
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import org.apache.storm.Config;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.state.StateFactory;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.DefaultEvictionContext;
+import org.apache.storm.windowing.Event;
+import org.apache.storm.windowing.EventImpl;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
+import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
+import static 
org.apache.storm.topology.WindowPartitionCache.RemovalListener;
+
+/**
+ * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses 
state and the underlying
+ * checkpointing mechanisms to save the tuples in window to state. The 
tuples are also kept in-memory
+ * by transparently caching the window partitions and checkpointing them 
as needed.
+ */
+public class PersistentWindowedBoltExecutor extends 
WindowedBoltExecutor implements IStatefulBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
+private final IStatefulWindowedBolt statefulWindowedBolt;
+private transient TopologyContext topologyContext;
+private transient OutputCollector outputCollector;
+private transient WindowState state;
+private transient boolean stateInitialized;
+private transient boolean prePrepared;
+
+public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) {
+super(bolt);
+statefulWindowedBolt = bolt;
+}
+
+@Override
+public void prepare(Map topoConf, TopologyContext 
context, OutputCollector collector) {
+List registrations = (List) 
topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
+registrations.add(ConcurrentLinkedQueue.class.getName());
+registrations.add(LinkedList.class.getName());
+registrations.add(AtomicInteger.class.getName());
+registrations.add(EventImpl.class.getName());
+registrations.add(WindowPartition.class.getName());
+registrations.add(DefaultEvictionContext.class.getName());
+topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
+prepare(topoConf, context, collector,
+getWindowState(topoConf, context),
+getPartitionState(topoConf, context),
+getWindowSystemState(topoConf, context));
+}
+
+// package access for unit tests
+void prepare(Map topoConf, TopologyContext context, 
OutputCollector collector,
+ KeyValueState 

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r128992641
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/IStatefulWindowedBolt.java ---
@@ -15,12 +15,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.topology;
 
 import org.apache.storm.state.State;
 
 /**
- * A windowed bolt abstraction for supporting windowing operation with 
state
+ * A windowed bolt abstraction for supporting windowing operation with 
state.
  */
 public interface IStatefulWindowedBolt extends 
IStatefulComponent, IWindowedBolt {
+/**
+ * If the stateful windowed bolt should have its windows persisted in 
state and maintain a subset
+ * (recent events) in memory.
--- End diff --

this maybe frequent events instead of recent events. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread satishd
Github user satishd commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129804787
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/SimpleWindowPartitionCache.java 
---
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple implementation that evicts the largest un-pinned entry from 
the cache. This works well
+ * for caching window partitions since the access pattern is mostly 
sequential scans.
+ */
+public class SimpleWindowPartitionCache implements 
WindowPartitionCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(SimpleWindowPartitionCache.class);
+
+private final ConcurrentSkipListMap map = new 
ConcurrentSkipListMap<>();
+private final Map pinned = new HashMap<>();
+private final long maximumSize;
+private final RemovalListener removalListener;
+private final CacheLoader cacheLoader;
+private final ReentrantLock lock = new ReentrantLock(true);
+private int size;
+
+@Override
+public V get(K key) {
+return getOrLoad(key, false);
+}
+
+@Override
+public V getPinned(K key) {
+return getOrLoad(key, true);
+}
+
+@Override
+public boolean unpin(K key) {
+LOG.debug("unpin '{}'", key);
+boolean res = false;
+try {
+lock.lock();
+if (pinned.compute(key, (k, v) -> v == null ? 0 : v - 1) <= 0) 
{
+pinned.remove(key);
+res = true;
+}
+} finally {
+lock.unlock();
+}
+LOG.debug("pinned '{}'", pinned);
+return res;
+}
+
+@Override
+public ConcurrentMap asMap() {
+return map;
+}
+
+@Override
+public void invalidate(K key) {
+try {
+lock.lock();
+if (isPinned(key)) {
+LOG.debug("Entry '{}' is pinned, skipping invalidation", 
key);
+} else {
+LOG.debug("Invalidating entry '{}'", key);
+V val = map.remove(key);
+if (val != null) {
+--size;
+pinned.remove(key);
+removalListener.onRemoval(key, val, 
RemovalCause.EXPLICIT);
+}
+}
+} finally {
+lock.unlock();
+}
+}
+
+// Get or load from the cache optionally pinning the entry
+// so that it wont get evicted from the cache
+private V getOrLoad(K key, boolean shouldPin) {
+V val;
+if (shouldPin) {
+try {
+lock.lock();
+val = load(key);
+pin(key);
+} finally {
+lock.unlock();
+}
+} else {
+val = map.get(key);
--- End diff --

Why are we checking here as this is already checked in `load(key)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread Alexandre Vermeerbergen
Hello Roshan,

Thank you for your detailed answer. Details are important, because in my
organization, I am often asked to re-assess the reasons why we chose Storm
over its competitors.

Best regards,
Alexandre Vermeerbergen


2017-07-25 23:36 GMT+02:00 roshannaik :

> Github user roshannaik commented on the issue:
>
> https://github.com/apache/storm/pull/2241
>
> @avermeer Looks like SuperChief blog is relaying the same basic claims
> that Heron has marketed. Since you ask, i will share my opinions wrt
> Heron's claims.
>
> - Heron has never been a player in the high performance club. They
> have been smart about not comparing themselves with the real top performers
> of the day. I only included them here because they have built they have
> made much noise against Storm. They are smart about not mentioning which
> version of Storm they are comparing with (how does a paper with such
> critical info missing get accepted ?). That creates an illusion in people
> that their perf claims apply to all versions of Storm in general... even if
> Storm [publishes new perf numbers](hortonworks.com/blog/
> microbenchmarking-storm-1-0-performance/) comparing itself to a prior
> version.
> - Heron's threading model (1 thread per process.. based on what i
> gather from their articles), is really primitive for this application
> domain.  I don't recommend it, but by setting 'topology.workers' equal to
> the number of spout& bolt instances, Storm can be run in Heron mode.
> -  I find it much easier to debug a process with multiple components
> using a debugger rather start a separate debugger for every instance of
> spout bolt running. Also, I would imagine, having so many processes means
> you have an explosion of log files to deal with when triaging.
> - Unclear why the recovery model (when worker process crashes) is any
> better ... the same kind of replay from the spout would be required. The
> gains may be minor if any. Making minor optimizations to the failure path
> and penalizing the normal operation path... is backwards.
> - Cant get a stack from a Storm worker ? Thats clearly false. Try it
> yourself. I do it all the time. Heapdumps, on the other hand, can stall the
> worker and if the heap size is really large the supervisor might feel the
> worker is having a problem. There are timeouts that you can increase to for
> the supervisor to wait longer. I cant imagine that Heron doesn't monitor
> their workers and restart them if they are not responsive.
> -  Heron's Backpressure model is simply too overweight, but marketed
> as a novel idea.
> - A quick read of their latest perf blog, noted in the comparison, and
> it was evident that they missed recognizing their real perf problem.
>
>
>
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
> with INFRA.
> ---
>


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2241
  
@harshach 
For ThroughputvsLatency, throttling spout is intended. We set desired 
throughput and see histogram of latency and other metrics. (CPU, GC, etc.) 
There're 3 components in topology which parallelism would be set to 4 * worker 
count so total 12 executor threads per worker. I think we can parameterize the 
magic number 4 and adjust it while testing too.

I have also done with some performance tests, without modifying TVL 
topology. The reason is that we should also care about 
non-performance-maximized topology. For benchmarking performance maximized 
topology we also have ConstSpoutIdBoltNullBoltTopo, so let's not modify TVL and 
verify this patch works with all the cases.

Since this patch doesn't seem to handle inter-worker communication 
properly, the test set what we can do for now is very limited.

Here's my machine spec used for performance test:

```
java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
```

```
OS: Ubuntu 17.04
CPU: AMD Ryzen 5 1600 3.2Ghz 6 core (with hyper-thread = 12 logical cores)
RAM: Samsung DDR4 32G 19200
SSD: Samsung 850 Evo
```

and here's my number (just pasted as raw number):

https://docs.google.com/spreadsheets/d/1J3S4R68CsazlINF60rQn4XCy2Hx5QNzqVoWSO9qo2tc/edit?usp=sharing

My observation is that this patch looks impressive with performance 
maximized topology, but this also looks really bad (not acceptable) with 
relatively idle topology. I've observed all the things what @revans2 observed 
with TVL tests. But this patch looks stable with ConstSpoutIdBoltNullBoltTopo 
and even CPU usage seems lower than stock in this test.

While we often publicize micro-benchmark result, in practice users would 
run much idle topologies.
I'm OK if things can be stabilized with adjusting parameters (if then I 
think default value should be here), but if not, it should be addressed before 
accepting the patch. I would be -1 if TVL result is not stable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.

2017-07-27 Thread harshach
Github user harshach commented on the issue:

https://github.com/apache/storm/pull/2241
  
@revans2 @HeartSaVioR 
Here are my findings 
https://docs.google.com/spreadsheets/d/1wPpC3YXp-vTIelRTUVoLxuxIYxUekiIHUpZ2ZEysC4Y/edit#gid=1644511...


1. Looking at ThroughputvsLatency I found some issues:
 - By default it adds 51 total threads , that IMO is incorrect when 
benchmarking in a 4-core machine. 
 
 - Also it adds two bolts for logging/measurements which might be impacting 
the numbers

https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/...

https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/...
 
 - It also throttles the spout

https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/...

I did the following changes:
- Disable the HTTP and Logging bolts
- Disable throttling spout, we want spout to run as fast as it can
- reduced the executor counts

If you see lines from 78 - 102. 

Apache Master clearly couldn't handle the faster spout and starts timing 
out. Perf degrades considerably and very quickly. Where as STORM-2306 not only 
was able to handle the faster spout and delivered stable and processing at more 
start out being 10x faster then improves to 35x faster compared to master.


2. Also ran storm-perf topologies ConstSpoutIdNullBoltIdTopo and 
ConstSpoutNullBoltTopo. These topologies are trying to see whats the message 
throughput and latency when there are only 2 components involved without 
including any external dependencies. Essentially testing the messaging system.

From line 3-45 you can see with this patch we are getting under 10ms 
(depends on the topology) compare to an avg of 250ms+. (with batchSize=1)

3. Also ran storm-examples ThroughputVsLatency with 2 workers. Here there 
is clearly a bug which is prevent inter-worker communication so don't have 
comparative numbers.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2218: STORM-2614: Enhance stateful windowing to persist the win...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/storm/pull/2218
  
@srdo, addressed your comments, let me know if I missed something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763588
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java
 ---
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import org.apache.storm.Config;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.state.StateFactory;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.DefaultEvictionContext;
+import org.apache.storm.windowing.Event;
+import org.apache.storm.windowing.EventImpl;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
+import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
+import static 
org.apache.storm.topology.WindowPartitionCache.RemovalListener;
+
+/**
+ * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses 
state and the underlying
+ * checkpointing mechanisms to save the tuples in window to state. The 
tuples are also kept in-memory
+ * by transparently caching the window partitions and checkpointing them 
as needed.
+ */
+public class PersistentWindowedBoltExecutor extends 
WindowedBoltExecutor implements IStatefulBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
+private final IStatefulWindowedBolt statefulWindowedBolt;
+private transient TopologyContext topologyContext;
+private transient OutputCollector outputCollector;
+private transient WindowState state;
+private transient boolean stateInitialized;
+private transient boolean prePrepared;
+
+public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) {
+super(bolt);
+statefulWindowedBolt = bolt;
+}
+
+@Override
+public void prepare(Map topoConf, TopologyContext 
context, OutputCollector collector) {
+List registrations = (List) 
topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
+registrations.add(ConcurrentLinkedQueue.class.getName());
+registrations.add(LinkedList.class.getName());
+registrations.add(AtomicInteger.class.getName());
+registrations.add(EventImpl.class.getName());
+registrations.add(WindowPartition.class.getName());
+registrations.add(DefaultEvictionContext.class.getName());
+topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
+prepare(topoConf, context, collector,
+getWindowState(topoConf, context),
+getPartitionState(topoConf, context),
+getWindowSystemState(topoConf, context));
+}
+
+// package access for unit tests
+void prepare(Map topoConf, TopologyContext context, 
OutputCollector collector,
+ KeyValueState 

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763692
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/SimpleWindowPartitionCache.java 
---
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple implementation that evicts the largest un-pinned entry from 
the cache. This works well
+ * for caching window partitions since the access pattern is mostly 
sequential scans.
+ */
+public class SimpleWindowPartitionCache implements 
WindowPartitionCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(SimpleWindowPartitionCache.class);
+
+private final ConcurrentSkipListMap map = new 
ConcurrentSkipListMap<>();
+private final Map pinned = new HashMap<>();
+private final long maximumSize;
+private final RemovalListener removalListener;
+private final CacheLoader cacheLoader;
+private final ReentrantLock lock = new ReentrantLock(true);
+private int size;
+
+@Override
+public V get(K key) {
+return getOrLoad(key, false);
+}
+
+@Override
+public V getPinned(K key) {
+return getOrLoad(key, true);
+}
+
+@Override
+public boolean unpin(K key) {
+LOG.debug("unpin '{}'", key);
+boolean res = false;
+try {
+lock.lock();
+if (pinned.compute(key, (k, v) -> v == null ? 0 : v - 1) <= 0) 
{
+pinned.remove(key);
+res = true;
+}
+} finally {
+lock.unlock();
+}
+LOG.debug("pinned '{}'", pinned);
+return res;
+}
+
+@Override
+public ConcurrentMap asMap() {
+return map;
+}
+
+@Override
+public void invalidate(K key) {
+try {
+lock.lock();
+if (isPinned(key)) {
+LOG.debug("Entry '{}' is pinned, skipping invalidation", 
key);
+} else {
+LOG.debug("Invalidating entry '{}'", key);
+V val = map.remove(key);
+if (val != null) {
+--size;
+pinned.remove(key);
+removalListener.onRemoval(key, val, 
RemovalCause.EXPLICIT);
+}
+}
+} finally {
+lock.unlock();
+}
+}
+
+// Get or load from the cache optionally pinning the entry
+// so that it wont get evicted from the cache
+private V getOrLoad(K key, boolean shouldPin) {
+V val;
+if (shouldPin) {
+try {
+lock.lock();
+val = load(key);
+pin(key);
+} finally {
+lock.unlock();
+}
+} else {
+val = map.get(key);
+if (val == null) {
+try {
+lock.lock();
+val = load(key);
+} finally {
+lock.unlock();
+}
+}
+}
+
+return val;
+}
+
+private V load(K key) {
+V val = map.get(key);
+if (val == null) {
+val = cacheLoader.load(key);
+if (val == null) {
+throw new NullPointerException("Null value");
+}
+   

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763781
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/windowing/TupleWindowIterImpl.java ---
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.windowing;
+
+import com.google.common.collect.Iterators;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * An iterator based implementation over the events in a window.
+ */
+public class TupleWindowIterImpl implements TupleWindow {
+private final Supplier tuplesIt;
+private final Supplier newTuplesIt;
+private final Supplier expiredTuplesIt;
+private final Long startTimestamp;
+private final Long endTimestamp;
+
+public TupleWindowIterImpl(Supplier tuplesIt,
+   Supplier newTuplesIt,
+   Supplier expiredTuplesIt,
+   Long startTimestamp, Long endTimestamp) {
+this.tuplesIt = tuplesIt;
+this.newTuplesIt = newTuplesIt;
+this.expiredTuplesIt = expiredTuplesIt;
+this.startTimestamp = startTimestamp;
+this.endTimestamp = endTimestamp;
+}
+
+@Override
+public List get() {
+List tuples = new ArrayList<>();
+tuplesIt.get().forEachRemaining(t -> tuples.add(t));
+return tuples;
+}
+
+@Override
+public List getNew() {
+throw new UnsupportedOperationException("Not implemented");
--- End diff --

it not straightforward to return the new and expired tuples iterators 
without persisting the tuples separately and maintaining the state, so for now 
its unsupported. The newTuplesIt and expiredTuplesIt receives null values for 
now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763573
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java
 ---
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import org.apache.storm.Config;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.state.StateFactory;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.DefaultEvictionContext;
+import org.apache.storm.windowing.Event;
+import org.apache.storm.windowing.EventImpl;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
+import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
+import static 
org.apache.storm.topology.WindowPartitionCache.RemovalListener;
+
+/**
+ * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses 
state and the underlying
+ * checkpointing mechanisms to save the tuples in window to state. The 
tuples are also kept in-memory
+ * by transparently caching the window partitions and checkpointing them 
as needed.
+ */
+public class PersistentWindowedBoltExecutor extends 
WindowedBoltExecutor implements IStatefulBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
+private final IStatefulWindowedBolt statefulWindowedBolt;
+private transient TopologyContext topologyContext;
+private transient OutputCollector outputCollector;
+private transient WindowState state;
+private transient boolean stateInitialized;
+private transient boolean prePrepared;
+
+public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) {
+super(bolt);
+statefulWindowedBolt = bolt;
+}
+
+@Override
+public void prepare(Map topoConf, TopologyContext 
context, OutputCollector collector) {
+List registrations = (List) 
topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
+registrations.add(ConcurrentLinkedQueue.class.getName());
+registrations.add(LinkedList.class.getName());
+registrations.add(AtomicInteger.class.getName());
+registrations.add(EventImpl.class.getName());
+registrations.add(WindowPartition.class.getName());
+registrations.add(DefaultEvictionContext.class.getName());
+topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
+prepare(topoConf, context, collector,
+getWindowState(topoConf, context),
+getPartitionState(topoConf, context),
+getWindowSystemState(topoConf, context));
+}
+
+// package access for unit tests
+void prepare(Map topoConf, TopologyContext context, 
OutputCollector collector,
+ KeyValueState 

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763661
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/SimpleWindowPartitionCache.java 
---
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple implementation that evicts the largest un-pinned entry from 
the cache. This works well
+ * for caching window partitions since the access pattern is mostly 
sequential scans.
+ */
+public class SimpleWindowPartitionCache implements 
WindowPartitionCache {
+private static final Logger LOG = 
LoggerFactory.getLogger(SimpleWindowPartitionCache.class);
+
+private final ConcurrentSkipListMap map = new 
ConcurrentSkipListMap<>();
+private final Map pinned = new HashMap<>();
+private final long maximumSize;
+private final RemovalListener removalListener;
+private final CacheLoader cacheLoader;
+private final ReentrantLock lock = new ReentrantLock(true);
+private int size;
+
+@Override
+public V get(K key) {
+return getOrLoad(key, false);
+}
+
+@Override
+public V getPinned(K key) {
+return getOrLoad(key, true);
+}
+
+@Override
+public boolean unpin(K key) {
+LOG.debug("unpin '{}'", key);
+boolean res = false;
+try {
+lock.lock();
+if (pinned.compute(key, (k, v) -> v == null ? 0 : v - 1) <= 0) 
{
+pinned.remove(key);
+res = true;
+}
+} finally {
+lock.unlock();
+}
+LOG.debug("pinned '{}'", pinned);
+return res;
+}
+
+@Override
+public ConcurrentMap asMap() {
+return map;
+}
+
+@Override
+public void invalidate(K key) {
+try {
+lock.lock();
+if (isPinned(key)) {
+LOG.debug("Entry '{}' is pinned, skipping invalidation", 
key);
+} else {
+LOG.debug("Invalidating entry '{}'", key);
+V val = map.remove(key);
+if (val != null) {
+--size;
+pinned.remove(key);
+removalListener.onRemoval(key, val, 
RemovalCause.EXPLICIT);
+}
+}
+} finally {
+lock.unlock();
+}
+}
+
+// Get or load from the cache optionally pinning the entry
+// so that it wont get evicted from the cache
+private V getOrLoad(K key, boolean shouldPin) {
+V val;
+if (shouldPin) {
+try {
+lock.lock();
+val = load(key);
+pin(key);
+} finally {
+lock.unlock();
+}
+} else {
+val = map.get(key);
+if (val == null) {
+try {
+lock.lock();
+val = load(key);
+} finally {
+lock.unlock();
+}
+}
+}
+
+return val;
+}
+
+private V load(K key) {
+V val = map.get(key);
+if (val == null) {
+val = cacheLoader.load(key);
+if (val == null) {
+throw new NullPointerException("Null value");
--- End diff --

  

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763759
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java ---
@@ -127,26 +154,30 @@ private void validate(Map topoConf, 
Count windowLengthCount, Dur
 throw new IllegalArgumentException("Window length is not 
specified");
 }
 
-if (windowLengthDuration != null && slidingIntervalDuration != 
null) {
-ensureDurationLessThanTimeout(windowLengthDuration.value + 
slidingIntervalDuration.value, topologyTimeout);
-} else if (windowLengthDuration != null) {
-ensureDurationLessThanTimeout(windowLengthDuration.value, 
topologyTimeout);
-} else if (slidingIntervalDuration != null) {
-ensureDurationLessThanTimeout(slidingIntervalDuration.value, 
topologyTimeout);
-}
+if (isPersistent()) {
+
ensureCheckpointIntervalLessThanTimeout(getCheckpointIntervalSecs(topoConf), 
topologyTimeout);
--- End diff --

we don't need to validate `message timeout > window length + sliding 
interval` here since the tuples are acked during checkpoint. Hence ensuring the 
`timeout > checkpoint interval` would suffice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763707
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java ---
@@ -102,6 +109,18 @@ private int getMaxSpoutPending(Map 
topoConf) {
 return maxPending;
 }
 
+private int getCheckpointIntervalSecs(Map topoConf) {
--- End diff --

it can be refactored


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763468
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java
 ---
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import org.apache.storm.Config;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.state.StateFactory;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.DefaultEvictionContext;
+import org.apache.storm.windowing.Event;
+import org.apache.storm.windowing.EventImpl;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
+import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
+import static 
org.apache.storm.topology.WindowPartitionCache.RemovalListener;
+
+/**
+ * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses 
state and the underlying
+ * checkpointing mechanisms to save the tuples in window to state. The 
tuples are also kept in-memory
+ * by transparently caching the window partitions and checkpointing them 
as needed.
+ */
+public class PersistentWindowedBoltExecutor extends 
WindowedBoltExecutor implements IStatefulBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
+private final IStatefulWindowedBolt statefulWindowedBolt;
+private transient TopologyContext topologyContext;
+private transient OutputCollector outputCollector;
+private transient WindowState state;
+private transient boolean stateInitialized;
+private transient boolean prePrepared;
+
+public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) {
+super(bolt);
+statefulWindowedBolt = bolt;
+}
+
+@Override
+public void prepare(Map topoConf, TopologyContext 
context, OutputCollector collector) {
+List registrations = (List) 
topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
+registrations.add(ConcurrentLinkedQueue.class.getName());
+registrations.add(LinkedList.class.getName());
+registrations.add(AtomicInteger.class.getName());
+registrations.add(EventImpl.class.getName());
+registrations.add(WindowPartition.class.getName());
+registrations.add(DefaultEvictionContext.class.getName());
+topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
+prepare(topoConf, context, collector,
+getWindowState(topoConf, context),
+getPartitionState(topoConf, context),
+getWindowSystemState(topoConf, context));
+}
+
+// package access for unit tests
+void prepare(Map topoConf, TopologyContext context, 
OutputCollector collector,
+ KeyValueState 

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763520
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java
 ---
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.util.AbstractCollection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+
+import org.apache.storm.Config;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.state.StateFactory;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.windowing.DefaultEvictionContext;
+import org.apache.storm.windowing.Event;
+import org.apache.storm.windowing.EventImpl;
+import org.apache.storm.windowing.WindowLifecycleListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.emptyIterator;
+import static org.apache.storm.topology.WindowPartitionCache.CacheLoader;
+import static org.apache.storm.topology.WindowPartitionCache.RemovalCause;
+import static 
org.apache.storm.topology.WindowPartitionCache.RemovalListener;
+
+/**
+ * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses 
state and the underlying
+ * checkpointing mechanisms to save the tuples in window to state. The 
tuples are also kept in-memory
+ * by transparently caching the window partitions and checkpointing them 
as needed.
+ */
+public class PersistentWindowedBoltExecutor extends 
WindowedBoltExecutor implements IStatefulBolt {
+private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class);
+private final IStatefulWindowedBolt statefulWindowedBolt;
+private transient TopologyContext topologyContext;
+private transient OutputCollector outputCollector;
+private transient WindowState state;
+private transient boolean stateInitialized;
+private transient boolean prePrepared;
+
+public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) {
+super(bolt);
+statefulWindowedBolt = bolt;
+}
+
+@Override
+public void prepare(Map topoConf, TopologyContext 
context, OutputCollector collector) {
+List registrations = (List) 
topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>());
+registrations.add(ConcurrentLinkedQueue.class.getName());
+registrations.add(LinkedList.class.getName());
+registrations.add(AtomicInteger.class.getName());
+registrations.add(EventImpl.class.getName());
+registrations.add(WindowPartition.class.getName());
+registrations.add(DefaultEvictionContext.class.getName());
+topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations);
+prepare(topoConf, context, collector,
+getWindowState(topoConf, context),
+getPartitionState(topoConf, context),
+getWindowSystemState(topoConf, context));
+}
+
+// package access for unit tests
+void prepare(Map topoConf, TopologyContext context, 
OutputCollector collector,
+ KeyValueState 

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763418
  
--- Diff: 
examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.starter;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.topology.base.BaseStatefulWindowedBolt;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An example that demonstrates the usage of {@link 
org.apache.storm.topology.IStatefulWindowedBolt} with window
+ * persistence.
+ * 
+ * The framework automatically checkpoints the tuples in window along with 
the bolt's state and restores the same
+ * during restarts.
+ * 
+ *
+ * 
+ * This topology uses 'redis' for state persistence, so you should also 
start a redis instance before deploying.
+ * If you are running in local mode you can just start a redis server 
locally which will be used for storing the state. The default
+ * RedisKeyValueStateProvider parameters can be overridden by setting 
"topology.state.provider.config", for e.g.
+ * 
+ * {
+ *   "jedisPoolConfig": {
+ * "host": "redis-server-host",
+ * "port": 6379,
+ * "timeout": 2000,
+ * "database": 0,
+ * "password": "xyz"
+ *   }
+ * }
+ * 
+ * 
+ */
+public class PersistentWindowingTopology {
+private static final Logger LOG = 
LoggerFactory.getLogger(PersistentWindowingTopology.class);
+
+// wrapper to hold global and window averages
+private static class Averages {
+private final double global;
+private final double window;
+
+Averages(double global, double window) {
+this.global = global;
+this.window = window;
+}
+
+@Override
+public String toString() {
+return "Averages{" + "global=" + String.format("%.2f", global) 
+ ", window=" + String.format("%.2f", window) + '}';
+}
+}
+
+/**
+ * A bolt that uses stateful persistence to store the windows along 
with the state (global avg).
+ */
+private static class AvgBolt extends 
BaseStatefulWindowedBolt>> {
+private static final String STATE_KEY = "avg";
+
+private OutputCollector collector;
+private KeyValueState> state;
+private Pair globalAvg;
+
+@Override
+public void prepare(Map topoConf, TopologyContext 
context, OutputCollector collector) {
+this.collector = collector;
+}
+
+@Override
+public void initState(KeyValueState> 
state) {
+this.state = state;
+globalAvg = state.get(STATE_KEY, Pair.of(0L, 0L));
+LOG.info("initState with global avg [" + (double) globalAvg._1 
/ globalAvg._2 + "]");
--- End diff --

_1 and _2 are exposed to be consistent with other `TupleN` classes. Will 
use the getters here.


---
If 

[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763373
  
--- Diff: docs/Windowing.md ---
@@ -266,3 +266,105 @@ tuples can be received within the timeout period.
 An example toplogy `SlidingWindowTopology` shows how to use the apis to 
compute a sliding window sum and a tumbling window 
 average.
 
+## Stateful windowing
+The default windowing implementation in storm stores the tuples in memory 
until they are processed and expired from the 
+window. This limits the use cases to windows that
+fit entirely in memory. Also the source tuples cannot be ack-ed until the 
window expiry requiring large message timeouts
+(topology.message.timeout.secs should be larger than the window length + 
sliding interval). This also puts extra loads 
+due to the complex acking and anchoring requirements.
+ 
+To address the above limitations and to support larger window sizes, storm 
provides stateful windowing support via `IStatefulWindowedBolt`. 
+User bolts should typically extend `BaseStatefulWindowedBolt` for the 
windowing operations with the framework automatically 
+managing the state of the window in the background.
+
+If the sources provide a monotonically increasing identifier as a part of 
the message, the framework can use this
+to periodically checkpoint the last expired and evaluated message ids, to 
avoid duplicate window evaluations in case of 
+failures or restarts. During recovery, the tuples with message ids lower 
than last expired id are discarded and tuples with 
+message id between the last expired and last evaluated message ids are fed 
into the system without activating any triggers. 
+The tuples beyond the last evaluated message ids are processed as usual. 
This can be enabled by setting
+the `messageIdField` as shown below,
+
+```java
+topologyBuilder.setBolt("mybolt",
+   new MyStatefulWindowedBolt()
+   .withWindow(...) // windowing configuarations
+   .withMessageIdField("msgid"), // a monotonically 
increasing 'long' field in the tuple
+   parallelism)
+   .shuffleGrouping("spout");
+```
+
+However, this option is feasible only if the sources can provide a 
monotonically increasing identifier in the tuple and the same is maintained
+while re-emitting the messages in case of failures. Here the tuples in 
window are still held in memory.
+
+For more details take a look at the sample topology in storm starter 
`StatefulWindowingTopology` which will help you get started.
--- End diff --

yes, will link to the file


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763340
  
--- Diff: docs/Windowing.md ---
@@ -266,3 +266,105 @@ tuples can be received within the timeout period.
 An example toplogy `SlidingWindowTopology` shows how to use the apis to 
compute a sliding window sum and a tumbling window 
 average.
 
+## Stateful windowing
+The default windowing implementation in storm stores the tuples in memory 
until they are processed and expired from the 
+window. This limits the use cases to windows that
+fit entirely in memory. Also the source tuples cannot be ack-ed until the 
window expiry requiring large message timeouts
+(topology.message.timeout.secs should be larger than the window length + 
sliding interval). This also puts extra loads 
+due to the complex acking and anchoring requirements.
+ 
+To address the above limitations and to support larger window sizes, storm 
provides stateful windowing support via `IStatefulWindowedBolt`. 
+User bolts should typically extend `BaseStatefulWindowedBolt` for the 
windowing operations with the framework automatically 
+managing the state of the window in the background.
+
+If the sources provide a monotonically increasing identifier as a part of 
the message, the framework can use this
+to periodically checkpoint the last expired and evaluated message ids, to 
avoid duplicate window evaluations in case of 
+failures or restarts. During recovery, the tuples with message ids lower 
than last expired id are discarded and tuples with 
+message id between the last expired and last evaluated message ids are fed 
into the system without activating any triggers. 
--- End diff --

yes its the trigger policy instances. will reword.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...

2017-07-27 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2218#discussion_r129763359
  
--- Diff: docs/Windowing.md ---
@@ -266,3 +266,105 @@ tuples can be received within the timeout period.
 An example toplogy `SlidingWindowTopology` shows how to use the apis to 
compute a sliding window sum and a tumbling window 
 average.
 
+## Stateful windowing
+The default windowing implementation in storm stores the tuples in memory 
until they are processed and expired from the 
+window. This limits the use cases to windows that
+fit entirely in memory. Also the source tuples cannot be ack-ed until the 
window expiry requiring large message timeouts
+(topology.message.timeout.secs should be larger than the window length + 
sliding interval). This also puts extra loads 
+due to the complex acking and anchoring requirements.
+ 
+To address the above limitations and to support larger window sizes, storm 
provides stateful windowing support via `IStatefulWindowedBolt`. 
+User bolts should typically extend `BaseStatefulWindowedBolt` for the 
windowing operations with the framework automatically 
+managing the state of the window in the background.
+
+If the sources provide a monotonically increasing identifier as a part of 
the message, the framework can use this
+to periodically checkpoint the last expired and evaluated message ids, to 
avoid duplicate window evaluations in case of 
+failures or restarts. During recovery, the tuples with message ids lower 
than last expired id are discarded and tuples with 
+message id between the last expired and last evaluated message ids are fed 
into the system without activating any triggers. 
+The tuples beyond the last evaluated message ids are processed as usual. 
This can be enabled by setting
+the `messageIdField` as shown below,
+
+```java
+topologyBuilder.setBolt("mybolt",
+   new MyStatefulWindowedBolt()
+   .withWindow(...) // windowing configuarations
+   .withMessageIdField("msgid"), // a monotonically 
increasing 'long' field in the tuple
+   parallelism)
+   .shuffleGrouping("spout");
+```
+
+However, this option is feasible only if the sources can provide a 
monotonically increasing identifier in the tuple and the same is maintained
+while re-emitting the messages in case of failures. Here the tuples in 
window are still held in memory.
--- End diff --

will reword: With this option the tuples are still buffered in memory until 
processed and expired from the window.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---