[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r130010442 --- Diff: storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java --- @@ -39,7 +38,7 @@ public KryoTupleDeserializer(final Mapconf, final GeneralTopolo _kryoInput = new Input(1); } -public Tuple deserialize(byte[] ser) { +public TupleImpl deserialize(byte[] ser) { --- End diff -- Thanks Satish for clarifying. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r130010224 --- Diff: storm-client/src/jvm/org/apache/storm/topology/SimpleWindowPartitionCache.java --- @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple implementation that evicts the largest un-pinned entry from the cache. This works well + * for caching window partitions since the access pattern is mostly sequential scans. + */ +public class SimpleWindowPartitionCacheimplements WindowPartitionCache { --- End diff -- It is good to have unit tests, including with multiple threads. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r130009889 --- Diff: storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java --- @@ -39,7 +38,7 @@ public KryoTupleDeserializer(final Mapconf, final GeneralTopolo _kryoInput = new Input(1); } -public Tuple deserialize(byte[] ser) { +public TupleImpl deserialize(byte[] ser) { --- End diff -- I guess this is made possible since JDK 1.5 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r130009804 --- Diff: storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java --- @@ -39,7 +38,7 @@ public KryoTupleDeserializer(final Mapconf, final GeneralTopolo _kryoInput = new Input(1); } -public Tuple deserialize(byte[] ser) { +public TupleImpl deserialize(byte[] ser) { --- End diff -- Thanks! Good to know. Do you know which version of JDK make it possible? Was it possible from very first version of Java? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user arunmahadevan commented on the issue: https://github.com/apache/storm/pull/2241 Agree with @HeartSaVioR. If possible lets break this down into multiple patches like (1) JCQ replacing disruptor (2) changing the threading model (3) micro optimizations and so on which makes it easy to review and benchmark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r130009128 --- Diff: storm-client/src/jvm/org/apache/storm/serialization/KryoTupleDeserializer.java --- @@ -39,7 +38,7 @@ public KryoTupleDeserializer(final Mapconf, final GeneralTopolo _kryoInput = new Input(1); } -public Tuple deserialize(byte[] ser) { +public TupleImpl deserialize(byte[] ser) { --- End diff -- It works because Java supports covariant return types. But I am not sure why is this changed since I don't see this being made use of to avoid the downcast in the patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2241 Just leaving a note to make my requirements clear (it is quite simple): - new system doesn't break anything it worked - if they're unavoidable it should be discussed from Storm community, in worst case we decide to disallow to break - we should provide default values for relevant variables which makes most of topologies happy - for this patch it should show higher throughput and lower latency compared to default of master branch - (optionally) we may want to provide specific value for them which makes benchmark topologies (or user topologies which runs full speed all the time) happier We also may want to guide how parameters work, and how to tune them, and starting values for several use cases so that users can tweak their own topologies and find good values for them. I guess we didn't do that before, but maybe great to have. Please let me know when this patch achieves my requirements. I'd rather treat this as WIP and just wait for that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2200: STORM-2616: Documentation for built in metrics
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2200 +1 Nice documentation! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2245: Restore issues missing from changelog since fb2446075de20...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2245 As you may know, you can even just made a change if it was wrong. Now you also think it's hard to maintain CHANGELOG, let's see how the discussion goes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2245: Restore issues missing from changelog since fb2446075de20...
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2245 +1 You can merge this without waiting 24hr. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSS] Remove CHANGELOG file
correction: other projects -> *some* other projects, though they're popular projects (including in competition) 2017년 7월 28일 (금) 오전 10:51, Jungtaek Lim님이 작성: > I'm happy that there're other guys having same difficult and sharing same > feeling. > > This discussion has been initiating several times (from me) and getting > some +1s for each thread but didn't reach to actual work. > > We already utilize JIRA, and I'm subscribing issues@ and taking care of > issues forgot to mark resolve and/or labeling fixed versions. > It may sounds ideal for us to let reporters caring about their issues, but > committers can also help that, and in fact merger is in responsible to take > care of resolving the issue, so irrelevant to contributor for this side. > > My other consideration is that which thing is convenient for release > manager. Taylor took the release manager all the time (thanks for the great > work!) and it is directly related to release announcement so would like to > hear his opinion. If it is more convenient or he think he can tolerate > that, we can just go on. > > Please note that other projects don't use merge commit. Most of the time > they squash commits in PR into one, labeling commit title as JIRA issue, > making commit list just as CHANGELOG. That's another thing we discussed > earlier and I think we need to discuss again, but that can be discussed > from another thread. > > Regarding maintaining contributors: easy to explain. Just take a look at > what Spark has been doing. Some other projects follow the approach as well. > > We can run the script to extract authors of git commits, and just " | sort > | uniq", and done. Pulling assigner from JIRA issue may be more accurate, > since it requires actual account whereas author information in commit is > not strictly required to identify them. We can apply hybrid approach as > well, but for starter just following git commits looks OK to me. > > IMHO they don't feel proud strongly only they're listed in contributors. > Looking at contribution graph works better in this case, given that it also > shows commit count and lines of change. (regardless of accuracy) > It may give more proud to mention them as release announce. It will lead > contributors to play consistently, trying to participate and be mentioned > for releases as many as possible. IMO Spark built a great strategy for this > side, and if we all think it is great, why not follow? > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 2017년 7월 28일 (금) 오전 6:58, Stig Rohde Døssing 님이 > 작성: > >> We already have to keep JIRA updated, and keeping JIRA consistent is >> easier >> since there isn't one view of the resolved issues for each git branch like >> we have with CHANGELOG, so there's no worry about e.g. master having a >> different opinion on solved issues in 1.2.0 than 1.x-branch has. >> >> I think we already have the guideline that only small (e.g. typo) changes >> are okay without a JIRA issue. We're already encouraging one commit per >> issue, most of the PRs I've seen recently have been squashing before >> merge. >> Is this not your experience? >> >> I think we have the contributors/committers lists on SVN as well for >> generating http://storm.apache.org/contribute/People.html at >> https://svn.apache.org/repos/asf/storm/site/_data/. I think Jungtaek was >> suggesting keeping the committers list, and generating the contributors >> list for each release by either commit authors or JIRA assignees, but he >> can probably elaborate better. >> >> 2017-07-27 23:06 GMT+02:00 Hugo Da Cruz Louro : >> >> > I am +1 for discontinuing CHANGELOG. However, using JIRA to compile this >> > info will only work if contributors are very disciplined and consistent >> > updating JIRA. That leads to the question, is it any easier to maintain >> > JIRA consistent then it is to keep CHANGELOG consistent? A clean and >> > consistent JIRA is ideal, as it will also make it easy to create reports >> > for individual components, etc. >> > >> > This discussion touches a proposal I suggested awhile ago, that Storm >> > community should have a more strict and consistent Git log guideline. In >> > short, besides very trivial changes, like typos, or one or two line >> > changes, every feature or bug should be associated with a JIRA. >> > Furthermore, one commit should correspond to one JIRA, and one JIRA >> should >> > be solved by one commit. That means, we should focus on assuring that >> > commits are squashed, and their titles really reflect the issue they >> > address, etc. >> > >> > Af for the contributors and committers list. If we remove those lists, >> > where will this information be kept ? >> > >> > Hugo >> > >> > > On Jul 27, 2017, at 1:44 PM, Stig Rohde Døssing < >> stigdoess...@gmail.com> >> > wrote: >> > > >> > > Sorry to necro this thread, but I think it's worth bringing this >> issue up >> > > again. As Jungtaek mentioned a manual changelog is
Re: [DISCUSS] Remove CHANGELOG file
I'm happy that there're other guys having same difficult and sharing same feeling. This discussion has been initiating several times (from me) and getting some +1s for each thread but didn't reach to actual work. We already utilize JIRA, and I'm subscribing issues@ and taking care of issues forgot to mark resolve and/or labeling fixed versions. It may sounds ideal for us to let reporters caring about their issues, but committers can also help that, and in fact merger is in responsible to take care of resolving the issue, so irrelevant to contributor for this side. My other consideration is that which thing is convenient for release manager. Taylor took the release manager all the time (thanks for the great work!) and it is directly related to release announcement so would like to hear his opinion. If it is more convenient or he think he can tolerate that, we can just go on. Please note that other projects don't use merge commit. Most of the time they squash commits in PR into one, labeling commit title as JIRA issue, making commit list just as CHANGELOG. That's another thing we discussed earlier and I think we need to discuss again, but that can be discussed from another thread. Regarding maintaining contributors: easy to explain. Just take a look at what Spark has been doing. Some other projects follow the approach as well. We can run the script to extract authors of git commits, and just " | sort | uniq", and done. Pulling assigner from JIRA issue may be more accurate, since it requires actual account whereas author information in commit is not strictly required to identify them. We can apply hybrid approach as well, but for starter just following git commits looks OK to me. IMHO they don't feel proud strongly only they're listed in contributors. Looking at contribution graph works better in this case, given that it also shows commit count and lines of change. (regardless of accuracy) It may give more proud to mention them as release announce. It will lead contributors to play consistently, trying to participate and be mentioned for releases as many as possible. IMO Spark built a great strategy for this side, and if we all think it is great, why not follow? Thanks, Jungtaek Lim (HeartSaVioR) 2017년 7월 28일 (금) 오전 6:58, Stig Rohde Døssing님이 작성: > We already have to keep JIRA updated, and keeping JIRA consistent is easier > since there isn't one view of the resolved issues for each git branch like > we have with CHANGELOG, so there's no worry about e.g. master having a > different opinion on solved issues in 1.2.0 than 1.x-branch has. > > I think we already have the guideline that only small (e.g. typo) changes > are okay without a JIRA issue. We're already encouraging one commit per > issue, most of the PRs I've seen recently have been squashing before merge. > Is this not your experience? > > I think we have the contributors/committers lists on SVN as well for > generating http://storm.apache.org/contribute/People.html at > https://svn.apache.org/repos/asf/storm/site/_data/. I think Jungtaek was > suggesting keeping the committers list, and generating the contributors > list for each release by either commit authors or JIRA assignees, but he > can probably elaborate better. > > 2017-07-27 23:06 GMT+02:00 Hugo Da Cruz Louro : > > > I am +1 for discontinuing CHANGELOG. However, using JIRA to compile this > > info will only work if contributors are very disciplined and consistent > > updating JIRA. That leads to the question, is it any easier to maintain > > JIRA consistent then it is to keep CHANGELOG consistent? A clean and > > consistent JIRA is ideal, as it will also make it easy to create reports > > for individual components, etc. > > > > This discussion touches a proposal I suggested awhile ago, that Storm > > community should have a more strict and consistent Git log guideline. In > > short, besides very trivial changes, like typos, or one or two line > > changes, every feature or bug should be associated with a JIRA. > > Furthermore, one commit should correspond to one JIRA, and one JIRA > should > > be solved by one commit. That means, we should focus on assuring that > > commits are squashed, and their titles really reflect the issue they > > address, etc. > > > > Af for the contributors and committers list. If we remove those lists, > > where will this information be kept ? > > > > Hugo > > > > > On Jul 27, 2017, at 1:44 PM, Stig Rohde Døssing < > stigdoess...@gmail.com> > > wrote: > > > > > > Sorry to necro this thread, but I think it's worth bringing this issue > up > > > again. As Jungtaek mentioned a manual changelog is easy to break, and > it > > > looks like some issues are listed wrong on master and missing from 1.x > ( > > > https://github.com/apache/storm/pull/2245) > > > > > > I think dropping CHANGELOG is a great idea, and we might use Kafka as > an > > > example for how to do release notes. They have JIRA generate the notes >
Re: Request for holding off commits to storm-client module
Now pull request for STORM-2306 is up, and it looks like requiring several weeks (months?) to be verified. IMHO the effort of rebasing is unavoidable (though we could put effort to minimize), especially pull requests were up already. Unfortunately, that's about who takes the load, and I don't want to ask a favor of rebasing to authors waiting for weeks and even some months to get their PR merged in. So at least for earlier pull requests before the patch, I don't find a reason to block putting them into storm-client module right now. Opinions? - Jungtaek Lim (HeartSaVioR) 2017년 7월 24일 (월) 오후 4:30, Roshan Naik님이 작성: > I am fully ok with rebasing. I don’t expect development on Storm to cease > while the PR gets reviewed. Its only changes to storm-client are likely to > require to be done differently or not needed at all for the new system > (like some fixes that went recently into Disruptor). So only for changes to > storm-client module I am asking to see if the devs can work with me if its > critical to get it in right away… only to avoid duplication of efforts or > redundant efforts. > > Will have a PR out within the next 24 hours. > > -roshan > > > On 7/23/17, 7:57 PM, "Jungtaek Lim" wrote: > > Hi Roshan, > > I guess rebasing could be necessary even while in review phase, so > personally I'd rather see the PR first even it has merge conflict > between > origin master branch, and have review process with changes in PR, and > address once review process is done. > > If you want to hold off commits just before you submit a pull request, > and > you expect to submit it in several days, I'm also OK to wait for that. > I > just would like to ensure that holding doesn't stay longer. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 2017년 7월 24일 (월) 오전 7:40, Roshan Naik 님이 작성: > > > Storm Devs, > > My PR for STORM-2306 (messaging subsystem redesign) is almost > ready. Not > > surprisingly, this PR brings extensive modifications in storm-client > module > > and touches lightly on some others. > > > > Just finished manually rebasing my local changes (to ~90 files) from > an > > old version of master on to the latest master this Fri… but shortly > after > > I was done, more commits came into storm-client and need some manual > > reconciling .. so it’s a bit difficult to keep up with. > > > > So, would like to request committers and contributors looking to make > > changes in this critical module to either wait for this PR to go > through or > > work with me to see if we can work something out if its critical. > > > > Changes related to core areas like Disruptor, Metrics, Credential > updates, > > Worker, Executor, Task, Bolt/Spout output collectors/executors, > ACKing etc > > are areas with very high likelihood of merge conflicts. > > > > If there are better alternative ideas happy to consider. > > Thanks for understanding, > > > > -roshan > > > > >
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2241 Btw, I think talking about current state is less meaningful. This patch has lots of TODO and some critical identified issues, so it should be addressed, and after that the number is going to be really different. We may argue same things again and again, so maybe better to hold before this patch becomes really ready to review (not WIP). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2241 I don't think utilizing metrics consumer in TVL is the issue: it might matter if results are close so that contributions of other system component does matter, but it is just not acceptable latency for low rate. Huge gap between twos. Let's say we get rid of metrics consumer and that makes stable, then are we going to pursue users to not use metrics consumer? That doesn't make sense. While I don't think so, but if we think metrics consumer contributes throughput and/or latency in really odd way, it needs to be validated and fixed. As you can see my result, CPU was over 100% even with rate 500 and total tasks of three key components were 12 (not 48, please keep in mind). All results for TVL was captured in that way. So this patch shows high CPU usage in baseline (say minimal load) and shows fluctuation by 80% all over rate 1, whereas master branch was 20%. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on the issue: https://github.com/apache/storm/pull/2241 @revans2 About that "better than sliced bread" : how could i not be offended.. at least briefly ;-) but you could buy me lunch if this PR turns out better than you were initially afraid of. It was perhaps a very "low latency" -1 for any PR in the history of Storm. :-) Yes there are rough edges and some bugs... but I do dream of being able nail it all the way in one go. Your observation about the **very high latency for low throughput** topos. That is clearly a problem with batch not filling up and not getting flushed. The 5 sec latency corresponds to the 'topology.flush.tuple.freq.millis' setting (default 5sec). So at each step between Spout->Bolt and Bolt->Bolt if the its waiting for 5 sec then you are likely to see such ridiculous latency numbers. Given that, I think the solution must be evident by now ... but will state it here for the benefit of other readers to whom it may not be: Tweak one or both of these two settings: - **topology.producer.batch.size** : for low throughput topos setting this to 1 is a good idea. In the new system the throughput penalty is not that for a small batch much compared to larger batch size. - **topology.flush.tuple.freq.millis**: You could reduce this for more frequent batch flushing. It causes the timer thread to take up more cpu... but if throughput fluctuates between very and very low over the day, then is setting maybe better to meet latency SLA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user harshach commented on the issue: https://github.com/apache/storm/pull/2241 @revans2 I am trying to reproduce the worst-case in your last chart. Running TVL topology with 4 spout, 10 splitters, 4 counters, 2 ackers. Here is the code https://gist.github.com/harshach/73dae347c178ac5dd8651cb0e7902412 Running it via following command against Master and STORM-2306 `/bin/storm jar /tmp/storm-starter-2.0.0-SNAPSHOT.jar org.apache.storm.starter.ThroughputVsLatency 500 1 -c topology.workers=1 -c topology.max.spout.pending=500 -c topology.acker.executors=2` You can look at my results here https://docs.google.com/spreadsheets/d/1wPpC3YXp-vTIelRTUVoLxuxIYxUekiIHUpZ2ZEysC4Y/edit#gid=1239810430 in **sheet 2** What I see not much difference between Master and STORM-2306. Let me know if I am missing something in running this test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSS] Remove CHANGELOG file
We already have to keep JIRA updated, and keeping JIRA consistent is easier since there isn't one view of the resolved issues for each git branch like we have with CHANGELOG, so there's no worry about e.g. master having a different opinion on solved issues in 1.2.0 than 1.x-branch has. I think we already have the guideline that only small (e.g. typo) changes are okay without a JIRA issue. We're already encouraging one commit per issue, most of the PRs I've seen recently have been squashing before merge. Is this not your experience? I think we have the contributors/committers lists on SVN as well for generating http://storm.apache.org/contribute/People.html at https://svn.apache.org/repos/asf/storm/site/_data/. I think Jungtaek was suggesting keeping the committers list, and generating the contributors list for each release by either commit authors or JIRA assignees, but he can probably elaborate better. 2017-07-27 23:06 GMT+02:00 Hugo Da Cruz Louro: > I am +1 for discontinuing CHANGELOG. However, using JIRA to compile this > info will only work if contributors are very disciplined and consistent > updating JIRA. That leads to the question, is it any easier to maintain > JIRA consistent then it is to keep CHANGELOG consistent? A clean and > consistent JIRA is ideal, as it will also make it easy to create reports > for individual components, etc. > > This discussion touches a proposal I suggested awhile ago, that Storm > community should have a more strict and consistent Git log guideline. In > short, besides very trivial changes, like typos, or one or two line > changes, every feature or bug should be associated with a JIRA. > Furthermore, one commit should correspond to one JIRA, and one JIRA should > be solved by one commit. That means, we should focus on assuring that > commits are squashed, and their titles really reflect the issue they > address, etc. > > Af for the contributors and committers list. If we remove those lists, > where will this information be kept ? > > Hugo > > > On Jul 27, 2017, at 1:44 PM, Stig Rohde Døssing > wrote: > > > > Sorry to necro this thread, but I think it's worth bringing this issue up > > again. As Jungtaek mentioned a manual changelog is easy to break, and it > > looks like some issues are listed wrong on master and missing from 1.x ( > > https://github.com/apache/storm/pull/2245) > > > > I think dropping CHANGELOG is a great idea, and we might use Kafka as an > > example for how to do release notes. They have JIRA generate the notes > and > > put them up alongside each release, for instance > > https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html. It's > a > > much nicer overview of changes than what we have in CHANGELOG where the > > issue types are mixed, and a manual changelog is easier to get out of > sync > > with JIRA. We could probably configure JIRA to generate something similar > > with a template (guide: > > https://confluence.atlassian.com/adminjiraserver073/ > creating-release-notes-861253367.html > > ). > > > > 2017-03-10 8:31 GMT+01:00 Jungtaek Lim : > > > >> This propose will make merge process simpler, so I guess committers/PMCs > >> might not have strong opinions about this. I'm concerning mainly about > how > >> it will affect release process. > >> > >> Taylor, I guess you're busy, but could you give your opinion about this > as > >> a release manager? Would removing CHANGELOG hurt or break release > process? > >> > >> And do we need to vote to make progress? > >> > >> Thanks, > >> Jungtaek Lim (HeartSaVioR) > >> > >> 2017년 3월 10일 (금) 오후 3:08, Xin Wang 님이 작성: > >> > >>> LGTM. I give a +1 for the idea. > >>> > >>> 2017-03-07 9:29 GMT+08:00 Jungtaek Lim : > >>> > Bump. I think it's not that trivial for code merger and release > >> manager, > and even contributors (how to represent their contributions.) > > 2017년 2월 24일 (금) 오전 9:43, Roshan Naik 님이 작성: > > > Sounds like a good idea to me. > > -roshan > > > > On 2/23/17, 4:41 PM, "Jungtaek Lim" wrote: > > > >Hi devs, > > > >I guess we discussed about this before, but didn't move to actual > work. > > > >I'd like to propose again removing CHANGELOG file in repository, > >>> and > > use > >JIRA issue's fixed version(s). > > > >Maintaining CHANGELOG to file is really easy to break. I've seen > > several > >times and most of them is about backport. CHANGELOG file between > > branches > >are inconsistent. > > > >Suppose we would like to backport the issue to 1.0.x which is > >> only > > applied > >to 2.0.0, then we should fix CHANGELOG from three branches. Easy > >> to > > miss > >and redundant. > > > >I'd also like to remove Project leads / Committers / Contributors
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user harshach commented on the issue: https://github.com/apache/storm/pull/2241 @revans2 I am trying to reproduce the worst-case in your last chart. Running TVL topology with 4 spout, 10 splitters, 4 counters, 2 ackers. Here is the code https://gist.github.com/harshach/73dae347c178ac5dd8651cb0e7902412 Running it via following command against Master and STORM-2306 `/bin/storm jar /tmp/storm-starter-2.0.0-SNAPSHOT.jar org.apache.storm.starter.ThroughputVsLatency 500 1 -c topology.workers=1 -c topology.max.spout.pending=500 -c topology.acker.executors=2` You can look at my results here https://docs.google.com/spreadsheets/d/1wPpC3YXp-vTIelRTUVoLxuxIYxUekiIHUpZ2ZEysC4Y/edit#gid=1239810430 in **sheet 2** What I see not much difference between Master and STORM-2306. Let me know if I am missing something in running this test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on the issue: https://github.com/apache/storm/pull/2241 Some points covering prev comments by @HeartSaVioR and @revans2 **Throughput limiting:** That only makes sense if you are measuring Throughput vs CPU/other resource usage. Latency measurements do not need it. And its a sin if you are doing that when trying to measure throughput. **TVL topology:** - Given its rate limiting nature, it definitely does not have the right name. Its employment of very high threads counts and rate limiting spouts appear to be tuned to work within the limitations of the current msging system and target the old sweetspot. Deserves a question. Harsha's measurements (which are more sensible in terms of executor counts), shows that the current msging was brought down to its knees very quickly once the rate limiting went away. @revans2 The drop you are seeing with the increased in splitter counts is indicative of the increased CPU contention going on even when not enough data flowing through an executor (the issue you initially brought up... of high CPU usage for idle topos). The old system, executor seems to be spending more time sleeping when there is insufficient data flow and less CPU contention and adding redundant/idle executors is not affecting it as much.So you can throughput plateaus. Lowering the CPU contention for idle mode is something i plan to address... and i think have left some TODOs for myself in the code already for to keep me honest. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #476: STORM-720: Storm.cmd should return ERRORLEVEL befor...
Github user rtandonmsft closed the pull request at: https://github.com/apache/storm/pull/476 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSS] Remove CHANGELOG file
I am +1 for discontinuing CHANGELOG. However, using JIRA to compile this info will only work if contributors are very disciplined and consistent updating JIRA. That leads to the question, is it any easier to maintain JIRA consistent then it is to keep CHANGELOG consistent? A clean and consistent JIRA is ideal, as it will also make it easy to create reports for individual components, etc. This discussion touches a proposal I suggested awhile ago, that Storm community should have a more strict and consistent Git log guideline. In short, besides very trivial changes, like typos, or one or two line changes, every feature or bug should be associated with a JIRA. Furthermore, one commit should correspond to one JIRA, and one JIRA should be solved by one commit. That means, we should focus on assuring that commits are squashed, and their titles really reflect the issue they address, etc. Af for the contributors and committers list. If we remove those lists, where will this information be kept ? Hugo > On Jul 27, 2017, at 1:44 PM, Stig Rohde Døssing> wrote: > > Sorry to necro this thread, but I think it's worth bringing this issue up > again. As Jungtaek mentioned a manual changelog is easy to break, and it > looks like some issues are listed wrong on master and missing from 1.x ( > https://github.com/apache/storm/pull/2245) > > I think dropping CHANGELOG is a great idea, and we might use Kafka as an > example for how to do release notes. They have JIRA generate the notes and > put them up alongside each release, for instance > https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html. It's a > much nicer overview of changes than what we have in CHANGELOG where the > issue types are mixed, and a manual changelog is easier to get out of sync > with JIRA. We could probably configure JIRA to generate something similar > with a template (guide: > https://confluence.atlassian.com/adminjiraserver073/creating-release-notes-861253367.html > ). > > 2017-03-10 8:31 GMT+01:00 Jungtaek Lim : > >> This propose will make merge process simpler, so I guess committers/PMCs >> might not have strong opinions about this. I'm concerning mainly about how >> it will affect release process. >> >> Taylor, I guess you're busy, but could you give your opinion about this as >> a release manager? Would removing CHANGELOG hurt or break release process? >> >> And do we need to vote to make progress? >> >> Thanks, >> Jungtaek Lim (HeartSaVioR) >> >> 2017년 3월 10일 (금) 오후 3:08, Xin Wang 님이 작성: >> >>> LGTM. I give a +1 for the idea. >>> >>> 2017-03-07 9:29 GMT+08:00 Jungtaek Lim : >>> Bump. I think it's not that trivial for code merger and release >> manager, and even contributors (how to represent their contributions.) 2017년 2월 24일 (금) 오전 9:43, Roshan Naik 님이 작성: > Sounds like a good idea to me. > -roshan > > On 2/23/17, 4:41 PM, "Jungtaek Lim" wrote: > >Hi devs, > >I guess we discussed about this before, but didn't move to actual work. > >I'd like to propose again removing CHANGELOG file in repository, >>> and > use >JIRA issue's fixed version(s). > >Maintaining CHANGELOG to file is really easy to break. I've seen > several >times and most of them is about backport. CHANGELOG file between > branches >are inconsistent. > >Suppose we would like to backport the issue to 1.0.x which is >> only > applied >to 2.0.0, then we should fix CHANGELOG from three branches. Easy >> to > miss >and redundant. > >I'd also like to remove Project leads / Committers / Contributors >>> in > README >(at least Contributors) since it's also easy to break. > >For PMC members we're maintaining it to website and I think >> that's > enough. >For contributors I love what other projects are doing: extract >>> unique >contributors name from commits or JIRA issues of release version >>> and >mention them from release announce note. > >What do you think? > >Thanks, >Jungtaek Lim (HeartSaVioR) > > > >>> >>
Re: [DISCUSS] Remove CHANGELOG file
Sorry to necro this thread, but I think it's worth bringing this issue up again. As Jungtaek mentioned a manual changelog is easy to break, and it looks like some issues are listed wrong on master and missing from 1.x ( https://github.com/apache/storm/pull/2245) I think dropping CHANGELOG is a great idea, and we might use Kafka as an example for how to do release notes. They have JIRA generate the notes and put them up alongside each release, for instance https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html. It's a much nicer overview of changes than what we have in CHANGELOG where the issue types are mixed, and a manual changelog is easier to get out of sync with JIRA. We could probably configure JIRA to generate something similar with a template (guide: https://confluence.atlassian.com/adminjiraserver073/creating-release-notes-861253367.html ). 2017-03-10 8:31 GMT+01:00 Jungtaek Lim: > This propose will make merge process simpler, so I guess committers/PMCs > might not have strong opinions about this. I'm concerning mainly about how > it will affect release process. > > Taylor, I guess you're busy, but could you give your opinion about this as > a release manager? Would removing CHANGELOG hurt or break release process? > > And do we need to vote to make progress? > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 2017년 3월 10일 (금) 오후 3:08, Xin Wang 님이 작성: > > > LGTM. I give a +1 for the idea. > > > > 2017-03-07 9:29 GMT+08:00 Jungtaek Lim : > > > > > Bump. I think it's not that trivial for code merger and release > manager, > > > and even contributors (how to represent their contributions.) > > > > > > 2017년 2월 24일 (금) 오전 9:43, Roshan Naik 님이 작성: > > > > > > > Sounds like a good idea to me. > > > > -roshan > > > > > > > > On 2/23/17, 4:41 PM, "Jungtaek Lim" wrote: > > > > > > > > Hi devs, > > > > > > > > I guess we discussed about this before, but didn't move to actual > > > work. > > > > > > > > I'd like to propose again removing CHANGELOG file in repository, > > and > > > > use > > > > JIRA issue's fixed version(s). > > > > > > > > Maintaining CHANGELOG to file is really easy to break. I've seen > > > > several > > > > times and most of them is about backport. CHANGELOG file between > > > > branches > > > > are inconsistent. > > > > > > > > Suppose we would like to backport the issue to 1.0.x which is > only > > > > applied > > > > to 2.0.0, then we should fix CHANGELOG from three branches. Easy > to > > > > miss > > > > and redundant. > > > > > > > > I'd also like to remove Project leads / Committers / Contributors > > in > > > > README > > > > (at least Contributors) since it's also easy to break. > > > > > > > > For PMC members we're maintaining it to website and I think > that's > > > > enough. > > > > For contributors I love what other projects are doing: extract > > unique > > > > contributors name from commits or JIRA issues of release version > > and > > > > mention them from release announce note. > > > > > > > > What do you think? > > > > > > > > Thanks, > > > > Jungtaek Lim (HeartSaVioR) > > > > > > > > > > > > > > > > > >
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on the issue: https://github.com/apache/storm/pull/2241 For the new messaging system.. the scaling rule of thumb I have found so far is quite simple. For fast topos (and CPU intensive topos) ... 1 executor thread per *physical core*. It applies to ACKer executors as well. Avoid trying to max out on logical cores / hyperthreads. You are likely to be close to getting the most out of your hardware with that rule. You can start with that and try adding/removing one or more executors to see if you can squeeze more. The older system will typically need more executors per machine to get similar numbers (throughput usage).. but throughput may not come close to the new system. The rule for executors count v/s CPU cores for the existing msgs system seems less simple to me. Trying to run 51 executors on a 4 core machine will surely be a step towards "de-scaling", if there is such a word. It is strange that such high executor count was useful in the current system. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2245: Restore issues missing from changelog since fb2446...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2245 Restore issues missing from changelog since fb2446075de202387658b0cc8⦠â¦434ee53ded5d35a See https://github.com/apache/storm/commit/fb2446075de202387658b0cc8434ee53ded5d35a I've grepped the log for the added issue numbers on 1.1.x-branch and the associated commits appear to be in 1.x but not 1.1.x, so they likely belong in 1.2.0. On master these issues are listed in 2.0.0, which is maybe also wrong(?) The two removed issues are duplicate, 2432 isn't fixed in 1.0.0 based on the 1.0.0 tag, and 2343 is listed twice in 1.1.1 ping @HeartSaVioR You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm restore-changelog-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2245.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2245 commit 4ca12ff5c806cfc3991cc9e8ff6cf5a7715b37ce Author: Stig Rohde DøssingDate: 2017-07-27T19:50:10Z Restore issues missing from changelog since fb2446075de202387658b0cc8434ee53ded5d35a --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r129946825 --- Diff: storm-client/src/jvm/org/apache/storm/Config.java --- @@ -548,22 +518,14 @@ public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks"; /** - * The size of the Disruptor receive queue for each executor. Must be a power of 2. + * The size of the receive queue for each executor. */ -@isPowerOf2 public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size"; --- End diff -- indeed! my bad. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2241 I have another chart now showing a comparison between master and this branch, just varying the number of splitter bolts in the topology. There are 2 ackers, 4 spouts, and 4 count bolts all within a single worker and with a max spout pending set to 500. All of the configs are the defaults, and it is on my laptop like before. ![chart](https://user-images.githubusercontent.com/3441321/28689893-de51856c-72dc-11e7-8f5b-8f2a77efdfd3.png) The scary thing here is that with these changes there is a tiny window where you get "good" throughput (lets say above 150k sentences per second) for this branch. The previous branch has a very very wide window. The thing that concerns me the most with the way it is now, is that there will be a lot of people who didn't turn the parallelism as low as possible, because it just works, and they will all have their topologies go from 180k/sec down to 50k/sec. And tuning them all perfectly to balance on that exact parallelism for the given heterogeneous hardware that we run on is going to be impossible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r129945279 --- Diff: storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java --- @@ -21,101 +21,100 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import org.apache.storm.generated.NodeInfo; import org.apache.storm.messaging.IConnection; import org.apache.storm.messaging.TaskMessage; -import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TransferDrainer { - private Map> bundles = new HashMap(); + private Map bundles = new HashMap(); + private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class); - - public void add(HashMap taskTupleSetMap) { -for (Map.Entry entry : taskTupleSetMap.entrySet()) { - addListRefToMap(this.bundles, entry.getKey(), entry.getValue()); + + // Cache the msgs grouped by destination node + public void add(TaskMessage taskMsg) { +int destId = taskMsg.task(); +ArrayList msgs = bundles.get(destId); --- End diff -- thanks for that sweet tip! just to be sure, i would feel comfortable verifying with profiler first ... given this is in critical path. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r129938970 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java --- @@ -122,28 +130,33 @@ public Task(Executor executor, Integer taskId) throws IOException { return new ArrayList<>(0); } + public List getOutgoingTasks(String stream, List values) { if (debug) { LOG.info("Emitting Tuple: taskId={} componentId={} stream={} values={}", taskId, componentId, stream, values); } -List outTasks = new ArrayList<>(); -if (!streamComponentToGrouper.containsKey(stream)) { -throw new IllegalArgumentException("Unknown stream ID: " + stream); -} -if (null != streamComponentToGrouper.get(stream)) { -// null value for __system -for (LoadAwareCustomStreamGrouping grouper : streamComponentToGrouper.get(stream).values()) { +ArrayList outTasks = new ArrayList<>(); + +// TODO: PERF: expensive hashtable lookup in critical path --- End diff -- Pretty much all these todos are intended to get addressed in this PR itself. Just wanted to call them out during this review. There are maybe 2 perf todo observations that I am hoping to leave in alongside the offending code .. as it took a lot of time to identify them (but not easy to fix here). So valuable to have them in IMO. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r129937665 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/GrouperFactory.java --- @@ -137,7 +137,7 @@ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks; +private ArrayListtargetTasks; --- End diff -- There is a minor diff.. In particular here I am critically counting on the stronger guarantees of const indexing time that is not provided by List<>. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r129936657 --- Diff: storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java --- @@ -30,4 +31,5 @@ void ack(Tuple input); void fail(Tuple input); void resetTimeout(Tuple input); +void flush(); --- End diff -- thanks for that sweet tip! just to be sure, i would feel comfortable verifying with profiler first ... given this is in critical path. ok Satish let me see about that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r129934123 --- Diff: conf/defaults.yaml --- @@ -304,6 +303,7 @@ storm.cgroup.resources: storm.cgroup.hierarchy.name: "storm" storm.supervisor.cgroup.rootdir: "storm" storm.cgroup.cgexec.cmd: "/bin/cgexec" +storm.cgroup.cgexec.cmd: "/bin/cgexec" --- End diff -- thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r129930495 --- Diff: conf/defaults.yaml --- @@ -231,16 +228,13 @@ topology.multilang.serializer: "org.apache.storm.multilang.JsonSerializer" topology.shellbolt.max.pending: 100 topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null -topology.max.spout.pending: null +topology.max.spout.pending: null # TODO: We dont need this any more --- End diff -- the critical code path is now lock free so should be no room for live/dead-locks. To be very precise, its not exactly a blocking queue.. as there is a retry loop that we can break out of anytime. There are some changes coming soon in this PR.. wrt to what happens when Q is full. will also add that info in the design doc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r129931296 --- Diff: conf/defaults.yaml --- @@ -231,16 +228,13 @@ topology.multilang.serializer: "org.apache.storm.multilang.JsonSerializer" topology.shellbolt.max.pending: 100 topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null -topology.max.spout.pending: null +topology.max.spout.pending: null # TODO: We dont need this any more topology.state.synchronization.timeout.secs: 60 -topology.stats.sample.rate: 0.05 +topology.stats.sample.rate: 0.001 --- End diff -- Ok I personally feel that sampling can be reduced a bit, but if you have a preference towards retaining it i am ok with that. JCQueue is at the core messaging system .. not separate. the numbers demonstrate the improvement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r129934167 --- Diff: examples/storm-elasticsearch-examples/src/main/java/org/apache/storm/elasticsearch/common/EsTestUtil.java --- @@ -43,7 +43,7 @@ public Fields getComponentOutputFields(String componentId, String streamId) { return new Fields("source", "index", "type", "id"); } }; -return new TupleImpl(topologyContext, new Values(source, index, type, id), 1, ""); +return new TupleImpl(topologyContext, new Values(source, index, type, id), "testSrc", 1, ""); --- End diff -- fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2241: STORM-2306 : Messaging subsystem redesign.
Github user roshannaik commented on a diff in the pull request: https://github.com/apache/storm/pull/2241#discussion_r129934940 --- Diff: examples/storm-perf/src/main/java/org/apache/storm/perf/ConstSpoutIdBoltNullBoltTopo.java --- @@ -76,6 +76,8 @@ public static StormTopology getTopology(Mapconf) { public static void main(String[] args) throws Exception { int runTime = -1; Config topoConf = new Config(); +topoConf.put(Config.TOPOLOGY_SPOUT_RECVQ_SKIPS, 8); --- End diff -- Higher numbers for that setting are only useful for very low latency spouts. The default setting there assumes that you are likely to be using Kafka/Hdfs kind of spouts which have higher latencies (amount of time spent inside nextTuple() ). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[RESULT] [VOTE] Release Apache Storm 1.0.4 (rc1)
This vote is now closed and passes with X binding +1 votes and no 0 or -1 votes. Vote tally (* indicates binding votes): +1: Stig Rhode Døssing* Bobby Evans* Kishor Patil* Jungtaek Lim* P. Taylor Goetz* I will release the staged artifacts and announce the release after the 24 hour waiting period. -Taylor > On Jul 24, 2017, at 2:24 PM, P. Taylor Goetzwrote: > > This is a call to vote on releasing Apache Storm 1.0.4 (rc1) > > Full list of changes in this release: > > https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGELOG.md;h=50878fab679973a1230466920006dc0746ffddd5;hb=eac433b0beb3798c4723deb39b3c4fad446378f4 > > The tag/commit to be voted upon is v1.0.4: > > https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGELOG.md;hb=a5e1c154b5b2ae74fd78bf10d4c130afb1ad4513 > > The source archive being voted upon can be found here: > > https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.4-rc1/apache-storm-1.0.4-src.tar.gz > > Other release files, signatures and digests can be found here: > > https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.4-rc1/ > > The release artifacts are signed with the following key: > > https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd > > The Nexus staging repository for this release is: > > https://repository.apache.org/content/repositories/orgapachestorm-1048 > > Please vote on releasing this package as Apache Storm 1.0.4. > > When voting, please list the actions taken to verify the release. > > This vote will be open for at least 72 hours. > > [ ] +1 Release this package as Apache Storm 1.0.4 > [ ] 0 No opinion > [ ] -1 Do not release this package because... > > Thanks to everyone who contributed to this release. > > -Taylor
Re: [VOTE] Release Apache Storm 1.0.4 (rc1)
+1 (binding) -Taylor > On Jul 24, 2017, at 2:24 PM, P. Taylor Goetzwrote: > > This is a call to vote on releasing Apache Storm 1.0.4 (rc1) > > Full list of changes in this release: > > https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGELOG.md;h=50878fab679973a1230466920006dc0746ffddd5;hb=eac433b0beb3798c4723deb39b3c4fad446378f4 > > The tag/commit to be voted upon is v1.0.4: > > https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGELOG.md;hb=a5e1c154b5b2ae74fd78bf10d4c130afb1ad4513 > > The source archive being voted upon can be found here: > > https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.4-rc1/apache-storm-1.0.4-src.tar.gz > > Other release files, signatures and digests can be found here: > > https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.0.4-rc1/ > > The release artifacts are signed with the following key: > > https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd > > The Nexus staging repository for this release is: > > https://repository.apache.org/content/repositories/orgapachestorm-1048 > > Please vote on releasing this package as Apache Storm 1.0.4. > > When voting, please list the actions taken to verify the release. > > This vote will be open for at least 72 hours. > > [ ] +1 Release this package as Apache Storm 1.0.4 > [ ] 0 No opinion > [ ] -1 Do not release this package because... > > Thanks to everyone who contributed to this release. > > -Taylor
[VOTE] Release Apache Storm 1.1.1 (rc2)
This is a call to vote on releasing Apache Storm 1.1.1 (rc2) Full list of changes in this release: https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGELOG.md;hb=41bfea87b1a002565333bd18a06d766af1ca3816 The tag/commit to be voted upon is v1.1.1: https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=948ce7d63a31fae8c478785985d0ef79808e234e;hb=41bfea87b1a002565333bd18a06d766af1ca3816 The source archive being voted upon can be found here: https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.1.1-rc2/apache-storm-1.1.1-src.tar.gz Other release files, signatures and digests can be found here: https://dist.apache.org/repos/dist/dev/storm/apache-storm-1.1.1-rc2/ The release artifacts are signed with the following key: https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd The Nexus staging repository for this release is: https://repository.apache.org/content/repositories/orgapachestorm-1050 Please vote on releasing this package as Apache Storm 1.1.1. When voting, please list the actions taken to verify the release. This vote will be open for at least 72 hours. [ ] +1 Release this package as Apache Storm 1.1.1 [ ] 0 No opinion [ ] -1 Do not release this package because... Thanks to everyone who contributed to this release. -Taylor
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2241 I have run some more tests looking at modifying the parallelism of different components. First I kept the parallelism of everything else at 4 and modified the acker count. ![chart_ackers](https://user-images.githubusercontent.com/3441321/28684646-bc460734-72ca-11e7-9434-8bdf2c263cab.png) I also kept the ackers at 2 spout and count at 4 and modified the splitter count ![chart_splitters](https://user-images.githubusercontent.com/3441321/28684647-bc462f5c-72ca-11e7-91f8-0a4e1c748682.png) The acker drop off at 5 is really scary, but adding too many splitters also shows a lot of problems. I am going to try something similar without the patch for comparison. Overall the numbers look really good in some situations, but it is really easy to slip into much worse territory. @knusbaum said that he was able to get a multi-worker setup to work, so that is something else I want to explore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2241 Sorry, but we also need to think about low throughput use cases. I have several that I care about and I am seeing very long latency for low throughput. min in some cases is 5 seconds, max can be up to 20 seconds, average is around 10 seconds and the CPU utilization is 500%. This too needs to be addressed. ``` 500 1 -c topology.workers=1 uptime: 30 acked: 4,000 acked/sec: 133.33 failed:0 99%: 9,923,723,263 99.9%: 9,999,220,735 min: 79,036,416 max: 10,015,997,951 mean: 5,861,829,371.65 stddev: 2,744,502,279.38 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 60 acked:15,000 acked/sec: 500.00 failed:0 99%: 14,646,509,567 99.9%: 14,973,665,279 min: 53,084,160 max: 15,023,996,927 mean: 7,410,713,531.31 stddev: 3,187,842,885.35 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 90 acked:16,000 acked/sec: 533.33 failed:0 99%: 14,747,172,863 99.9%: 14,990,442,495 min: 37,486,592 max: 15,032,385,535 mean: 7,947,532,282.45 stddev: 3,104,232,967.22 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 120 acked:14,000 acked/sec: 466.67 failed:0 99%: 14,856,224,767 99.9%: 14,998,831,103 min: 65,208,320 max: 15,023,996,927 mean: 9,071,752,875.48 stddev: 3,337,053,852.19 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 150 acked:13,000 acked/sec: 433.33 failed:0 99%: 14,914,945,023 99.9%: 14,998,831,103 min: 4,999,610,368 max: 15,074,328,575 mean: 10,374,946,814.88 stddev: 2,794,778,136.42 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 180 acked:16,000 acked/sec: 533.33 failed:0 99%: 14,940,110,847 99.9%: 15,049,162,751 min: 5,007,998,976 max: 15,602,810,879 mean: 10,539,964,609.74 stddev: 2,796,155,497.39 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 210 acked:15,000 acked/sec: 500.00 failed:0 99%: 14,881,390,591 99.9%: 14,998,831,103 min: 5,003,804,672 max: 15,015,608,319 mean: 9,616,077,147.72 stddev: 2,781,415,317.06 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 240 acked:10,000 acked/sec: 333.33 failed:0 99%: 14,889,779,199 99.9%: 15,007,219,711 min: 5,003,804,672 max: 15,015,608,319 mean: 9,840,073,724.86 stddev: 2,806,028,726.32 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 270 acked:16,000 acked/sec: 533.33 failed:0 99%: 17,951,621,119 99.9%: 19,780,337,663 min: 5,003,804,672 max: 20,015,218,687 mean: 10,556,609,171.18 stddev: 3,010,780,308.43 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 300 acked:15,000 acked/sec: 500.00 failed:0 99%: 14,898,167,807 99.9%: 14,998,831,103 min: 51,445,760 max: 15,023,996,927 mean: 9,694,508,448.06 stddev: 3,087,190,409.09 user: 0 sys: 0 gc: 0 mem: 0.00 ``` I am fine with the goals and the design work being done for this. If you can do better then the stuff I did for disruptor by all means rip out my code and make things better. The low throughput issue was one I had to fix with my initial patches to disruptor. People do care about this. I am not trying to be a jerk, I am just trying to keep my customers happy, share some of my experience doing something similar in the past, and also hopefully make storm much much better in the end. I apologize if I offended anyone. It was not my intention, but I really was shocked to see a patch everyone was touting as better than sliced bread decidedly worse in every way for a topology that worked really well before. I was able to max out the default configuration of a parallelism of 4 at 100,000 sentences per second fully acked. The new patch could only handle 1/3rd of that, and not when there is more then 1 worker. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user revans2 commented on the issue: https://github.com/apache/storm/pull/2241 @harshach Reiterating what @HeartSaVioR said about benchmarking. Most benchmarking is done where you push a system to its limits and see what maximum throughput it can do. This is far from what a real user wants. It looks good for a vendor to brag about I can do X but that other vendor over there can only do Y. But it is close to worthless for what real users want to know. Real users are trying to balance the cost of the system in $ (CPU time + memory used become this, how many EC whatever instances do I need), the amount of data that they can push through the system and how quickly they can get results back. Each of these variables are reflected by this test. In most cases a user has a set load that they know they get typically, and a reasonable guess at a maximum load that they expect to see. Also most users have a deadline by which the data is no good any more, if not they should be using batch. And a budget that they have to spend on this project, if not call me I want to work for you and my salary requirements are very reasonable. You need to give users tools to explore all three, and because the three are intertwined you want to be able to hold one or two of the variables constant while you measure the others. Storm currently has no way to set a target SLA (I hope to add one eventually), but you can control the rate at which messages arrive and the parallelism of the topology, (which reflects the cost). So the goal is to scan through various throughput values and various parallelisms to see what the latency is, and what resources are actually used. In the read world we would adjust the heap size and parallelism accordingly. Complaining about a benchmark creating 51 threads relates to the parallelism that we want to explore. If that is what I did wrong in the benchmark I am happy to adjust and reevaluate. I want to understand how the parallelism impacts this code. The thing that concerns me now is that it appears that scaling a topology is very different now, and I want to understand exactly how that works. I cannot easily roll out a change to my customers saying things might get a lot better or they might get a lot worse. We need to make it easy for a user with a topology that may not have been ideal (but worked well), to continue to work well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2241 I have updated all the results for TVL with second parameter set to 1. Also added rate 5. The CPU usage from current master doesn't fluctuate from all of rates, even 5, whereas with this patch the CPU usage sometimes fluctuate around 100%. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2241 Let me share a quick test result with passing `1 1` to TVL parameter: > STORM-2306 ``` uptime: 30 acked: 144,000 acked/sec: 4,800.00 failed:0 99%: 3,070,230,527 99.9%: 3,221,225,471 min: 63,897,600 max: 3,380,609,023 mean: 1,299,365,069.36 stddev: 685,287,508.16 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 60 acked: 303,000 acked/sec: 10,100.00 failed:0 99%: 3,011,510,271 99.9%: 3,200,253,951 min: 28,540,928 max: 3,303,014,399 mean: 1,283,728,691.41 stddev: 671,791,145.42 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 90 acked: 297,000 acked/sec: 9,900.00 failed:0 99%: 3,047,161,855 99.9%: 3,307,208,703 min: 62,980,096 max: 3,737,124,863 mean: 1,283,141,447.64 stddev: 675,126,086.16 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 120 acked: 303,000 acked/sec: 10,100.00 failed:0 99%: 3,047,161,855 99.9%: 3,206,545,407 min: 31,965,184 max: 3,347,054,591 mean: 1,284,140,763.79 stddev: 690,625,730.54 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 150 acked: 299,000 acked/sec: 9,966.67 failed:0 99%: 3,072,327,679 99.9%: 3,231,711,231 min: 16,703,488 max: 3,414,163,455 mean: 1,320,620,493.23 stddev: 693,327,734.87 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 180 acked: 300,000 acked/sec: 10,000.00 failed:0 99%: 3,042,967,551 99.9%: 3,248,488,447 min: 48,005,120 max: 3,846,176,767 mean: 1,313,068,274.86 stddev: 671,810,427.83 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 210 acked: 301,000 acked/sec: 10,033.33 failed:0 99%: 3,061,841,919 99.9%: 3,363,831,807 min: 51,347,456 max: 3,802,136,575 mean: 1,297,807,219.57 stddev: 678,980,965.35 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 240 acked: 301,000 acked/sec: 10,033.33 failed:0 99%: 3,019,898,879 99.9%: 3,208,642,559 min: 36,962,304 max: 3,363,831,807 mean: 1,315,037,518.24 stddev: 676,620,121.79 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 270 acked: 297,000 acked/sec: 9,900.00 failed:0 99%: 3,026,190,335 99.9%: 3,200,253,951 min: 52,363,264 max: 3,349,151,743 mean: 1,308,161,023.51 stddev: 680,121,348.29 user: 0 sys: 0 gc: 0 mem: 0.00 uptime: 300 acked: 300,000 acked/sec: 10,000.00 failed:0 99%: 3,021,996,031 99.9%: 3,200,253,951 min: 49,348,608 max: 3,317,694,463 mean: 1,335,928,012.31 stddev: 667,642,145.32 user: 0 sys: 0 gc: 0 mem: 0.00 ``` CPU usage was around 150 ~ 250%, mostly around 160% which seemed to be a bit more stable, but still fluctuating with small rate. > current ``` uptime: 30 acked: 140,440 acked/sec: 4,681.33 failed:0 99%: 14,016,511 99.9%: 26,558,463 min: 2,449,408 max: 52,035,583 mean:7,800,556.68 stddev:1,790,982.79 user: 28,620 sys: 2,340 gc: 0 mem: 49.94 uptime: 60 acked: 301,860 acked/sec: 10,062.00 failed:0 99%: 11,141,119 99.9%: 15,351,807 min: 3,233,792 max: 26,181,631 mean:7,479,081.72 stddev:1,175,253.40 user: 30,270 sys: 6,800 gc:190 mem: 54.88 uptime: 90 acked: 301,600 acked/sec: 10,053.33 failed:0 99%: 10,813,439 99.9%: 13,197,311 min: 3,246,080 max: 16,138,239 mean:7,375,841.06 stddev:1,112,541.35 user: 31,660 sys: 7,160 gc:194 mem: 54.68 uptime: 120 acked: 301,460 acked/sec: 10,048.67 failed:0 99%: 11,042,815 99.9%: 13,828,095 min: 3,266,560 max: 17,285,119 mean:7,400,672.94 stddev:1,130,409.32 user: 29,650 sys: 7,330 gc:200 mem: 47.80 uptime: 150 acked: 301,500 acked/sec: 10,050.00 failed:0 99%: 10,911,743 99.9%: 13,246,463 min: 3,248,128 max: 15,654,911 mean:7,399,041.82 stddev:1,118,368.85 user: 29,920 sys: 7,370 gc:199 mem: 41.95 uptime: 180 acked: 301,540 acked/sec: 10,051.33 failed:0 99%: 10,969,087 99.9%: 13,598,719 min: 3,233,792 max: 16,302,079 mean:7,390,435.62 stddev:1,129,976.16 user: 29,840 sys: 7,190 gc:201 mem: 41.16 uptime: 210 acked: 301,540 acked/sec: 10,051.33 failed:0 99%: 11,182,079 99.9%: 14,557,183 min: 3,246,080 max: 19,513,343 mean:7,382,121.55 stddev:1,161,863.92 user: 29,620 sys: 7,460 gc:
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129809356 --- Diff: docs/Windowing.md --- @@ -266,3 +266,108 @@ tuples can be received within the timeout period. An example toplogy `SlidingWindowTopology` shows how to use the apis to compute a sliding window sum and a tumbling window average. +## Stateful windowing +The default windowing implementation in storm stores the tuples in memory until they are processed and expired from the +window. This limits the use cases to windows that +fit entirely in memory. Also the source tuples cannot be ack-ed until the window expiry requiring large message timeouts +(topology.message.timeout.secs should be larger than the window length + sliding interval). This also puts extra loads +due to the complex acking and anchoring requirements. + +To address the above limitations and to support larger window sizes, storm provides stateful windowing support via `IStatefulWindowedBolt`. +User bolts should typically extend `BaseStatefulWindowedBolt` for the windowing operations with the framework automatically +managing the state of the window in the background. + +If the sources provide a monotonically increasing identifier as a part of the message, the framework can use this +to periodically checkpoint the last expired and evaluated message ids, to avoid duplicate window evaluations in case of +failures or restarts. During recovery, the tuples with message ids lower than last expired id are discarded and tuples with +message id between the last expired and last evaluated message ids are fed into the system without activating any previously +activated windows. +The tuples beyond the last evaluated message ids are processed as usual. This can be enabled by setting +the `messageIdField` as shown below, + +```java +topologyBuilder.setBolt("mybolt", + new MyStatefulWindowedBolt() + .withWindow(...) // windowing configuarations + .withMessageIdField("msgid"), // a monotonically increasing 'long' field in the tuple + parallelism) + .shuffleGrouping("spout"); +``` + +However, this option is feasible only if the sources can provide a monotonically increasing identifier in the tuple and the same is maintained +while re-emitting the messages in case of failures. With this option the tuples are still buffered in memory until processed +and expired from the window. + +For more details take a look at the sample topology in storm-starter [StatefulWindowingTopology](../examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulWindowingTopology.java) which will help you get started. + +### Window checkpointing + +With window checkpointing, the monotonically increasing id is no longer required since the framework transparently saves the state of the window periodically into the configured state backend. +The state that is saved includes the tuples in the window, any system state that is required to recover the state of processing +and also the user state. + +```java +topologyBuilder.setBolt("mybolt", + new MyStatefulPersistentWindowedBolt() + .withWindow(...) // windowing configuarations + .withPersistence() // persist the window state + .withMaxEventsInMemory(25000), // max number of events to be cached in memory +parallelism) + .shuffleGrouping("spout"); + +``` + +The `withPersistence` instructs the framework to transparently save the tuples in window along with +any associated system and user state to the state backend. The `withMaxEventsInMemory` is an optional +configuration that specifies the maximum number of tuples that may be kept in memory. The tuples are transparently loaded from +the state backend as required and the ones that are most likely to be used again are retained in memory. + +The state backend can be configured by setting the topology state provider config, + +```java +// use redis for state persistence +conf.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider"); + +``` +Currently storm supports Redis and HBase as state backends and uses the underlying state-checkpointing +framework for saving the window state. For more details on state checkpointing see [State-checkpointing.md](State-checkpointing.md) + +Here is an example of a persistent windowed bolt that uses the window checkpointing to save its state. The `initState` +is invoked with the last saved state (user state) at initialization time. The execute method is invoked based on
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2241 @harshach The second argument is effectively representing worker count: you can see that topology set worker count as parallelism. I agree that the name is really misleading even I ran tests with topology.workers instead of passing second argument. (need to run test again...) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user harshach commented on the issue: https://github.com/apache/storm/pull/2241 @HeartSaVioR Its not 12 executors per worker. If you don't pass a command-line argument, it sets parallelism variable here to 4 https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java#L277 and multiplys with 4 here again https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/ThroughputVsLatency.java#L359 . So setting a parallelism unit 16 per component. This is nothing to do with how many workers you've. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r128993342 --- Diff: storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java --- @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import org.apache.storm.Config; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.state.State; +import org.apache.storm.state.StateFactory; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.DefaultEvictionContext; +import org.apache.storm.windowing.Event; +import org.apache.storm.windowing.EventImpl; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.emptyIterator; +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader; +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause; +import static org.apache.storm.topology.WindowPartitionCache.RemovalListener; + +/** + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses state and the underlying + * checkpointing mechanisms to save the tuples in window to state. The tuples are also kept in-memory + * by transparently caching the window partitions and checkpointing them as needed. + */ +public class PersistentWindowedBoltExecutor extends WindowedBoltExecutor implements IStatefulBolt { +private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class); +private final IStatefulWindowedBolt statefulWindowedBolt; +private transient TopologyContext topologyContext; --- End diff -- nit: may want to remove this instance as it is never used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129003027 --- Diff: storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java --- @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import org.apache.storm.Config; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.state.State; +import org.apache.storm.state.StateFactory; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.DefaultEvictionContext; +import org.apache.storm.windowing.Event; +import org.apache.storm.windowing.EventImpl; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.emptyIterator; +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader; +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause; +import static org.apache.storm.topology.WindowPartitionCache.RemovalListener; + +/** + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses state and the underlying + * checkpointing mechanisms to save the tuples in window to state. The tuples are also kept in-memory + * by transparently caching the window partitions and checkpointing them as needed. + */ +public class PersistentWindowedBoltExecutor extends WindowedBoltExecutor implements IStatefulBolt { +private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class); +private final IStatefulWindowedBolt statefulWindowedBolt; +private transient TopologyContext topologyContext; +private transient OutputCollector outputCollector; +private transient WindowState state; +private transient boolean stateInitialized; +private transient boolean prePrepared; + +public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) { +super(bolt); +statefulWindowedBolt = bolt; +} + +@Override +public void prepare(MaptopoConf, TopologyContext context, OutputCollector collector) { +List registrations = (List) topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>()); +registrations.add(ConcurrentLinkedQueue.class.getName()); +registrations.add(LinkedList.class.getName()); +registrations.add(AtomicInteger.class.getName()); +registrations.add(EventImpl.class.getName()); +registrations.add(WindowPartition.class.getName()); +registrations.add(DefaultEvictionContext.class.getName()); +topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations); +prepare(topoConf, context, collector, +getWindowState(topoConf, context), +getPartitionState(topoConf, context), +getWindowSystemState(topoConf, context)); +} + +// package access for unit tests +void prepare(Map topoConf, TopologyContext context, OutputCollector collector, + KeyValueState
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129791075 --- Diff: storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java --- @@ -0,0 +1,596 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import org.apache.storm.Config; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.state.State; +import org.apache.storm.state.StateFactory; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.DefaultEvictionContext; +import org.apache.storm.windowing.Event; +import org.apache.storm.windowing.EventImpl; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.emptyIterator; +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader; +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause; +import static org.apache.storm.topology.WindowPartitionCache.RemovalListener; + +/** + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses state and the underlying + * checkpointing mechanisms to save the tuples in window to state. The tuples are also kept in-memory + * by transparently caching the window partitions and checkpointing them as needed. + */ +public class PersistentWindowedBoltExecutor extends WindowedBoltExecutor implements IStatefulBolt { +private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class); +private final IStatefulWindowedBolt statefulWindowedBolt; +private transient TopologyContext topologyContext; +private transient OutputCollector outputCollector; +private transient WindowState state; +private transient boolean stateInitialized; +private transient boolean prePrepared; + +public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) { +super(bolt); +statefulWindowedBolt = bolt; +} + +@Override +public void prepare(MaptopoConf, TopologyContext context, OutputCollector collector) { +List registrations = (List) topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>()); +registrations.add(ConcurrentLinkedQueue.class.getName()); +registrations.add(LinkedList.class.getName()); +registrations.add(AtomicInteger.class.getName()); +registrations.add(EventImpl.class.getName()); +registrations.add(WindowPartition.class.getName()); +registrations.add(DefaultEvictionContext.class.getName()); +topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations); +prepare(topoConf, context, collector, +getWindowState(topoConf, context), +getPartitionState(topoConf, context), +getWindowSystemState(topoConf, context)); +} + +@Override +protected void validate(Map topoConf, +
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r128991661 --- Diff: storm-client/src/jvm/org/apache/storm/topology/base/BaseStatefulWindowedBolt.java --- @@ -151,6 +158,38 @@ return this; } +/** + * If set, the stateful windowed bolt would use the backend state for window persistence and + * only keep a sub-set of events in memory as specified by {@link #withMaxEventsInMemory(long)}. + */ +public BaseStatefulWindowedBolt withPersistence() { +persistent = true; +return this; +} + +/** + * The maximum number of window events to keep in memory. This is meaningful only if + * {@link #withPersistence()} is also set. As the number of events in memory grows close + * to the maximum, the events that are less likely to be used again are evicted and persisted. + * The default value for this is {@code 1,000,000}. + * + * @param maxEventsInMemory the maximum number of window events to keep in memory + */ +public BaseStatefulWindowedBolt withMaxEventsInMemory(long maxEventsInMemory) { --- End diff -- We can explore later limiting with memory size as that would give better handle to the user on memory resources, which can be with onheap/offheap cache. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129803794 --- Diff: storm-client/src/jvm/org/apache/storm/topology/WindowPartitionCache.java --- @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.concurrent.ConcurrentMap; + +/** + * A loading cache abstraction for caching {@link PersistentWindowedBoltExecutor.WindowPartition}. + * + * @param the key type + * @param the value type + */ +public interface WindowPartitionCache{ + +/** + * Get value from the cache or load the value. + * + * @param key the key + * @return the value + */ +V get(K key); + +/** + * Get value from the cache or load the value pinning it + * so that the entry will never get evicted. + * + * @param key the key + * @return the value + */ +V getPinned(K key); --- End diff -- This method name can be changed to `pinAndGet` as `getPinned` was implies getting the value if it is pinned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129760482 --- Diff: storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java --- @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import org.apache.storm.Config; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.state.State; +import org.apache.storm.state.StateFactory; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.DefaultEvictionContext; +import org.apache.storm.windowing.Event; +import org.apache.storm.windowing.EventImpl; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.emptyIterator; +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader; +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause; +import static org.apache.storm.topology.WindowPartitionCache.RemovalListener; + +/** + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses state and the underlying + * checkpointing mechanisms to save the tuples in window to state. The tuples are also kept in-memory + * by transparently caching the window partitions and checkpointing them as needed. + */ +public class PersistentWindowedBoltExecutor extends WindowedBoltExecutor implements IStatefulBolt { +private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class); +private final IStatefulWindowedBolt statefulWindowedBolt; +private transient TopologyContext topologyContext; +private transient OutputCollector outputCollector; +private transient WindowState state; +private transient boolean stateInitialized; +private transient boolean prePrepared; + +public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) { +super(bolt); +statefulWindowedBolt = bolt; +} + +@Override +public void prepare(MaptopoConf, TopologyContext context, OutputCollector collector) { +List registrations = (List) topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>()); +registrations.add(ConcurrentLinkedQueue.class.getName()); +registrations.add(LinkedList.class.getName()); +registrations.add(AtomicInteger.class.getName()); +registrations.add(EventImpl.class.getName()); +registrations.add(WindowPartition.class.getName()); +registrations.add(DefaultEvictionContext.class.getName()); +topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations); +prepare(topoConf, context, collector, +getWindowState(topoConf, context), +getPartitionState(topoConf, context), +getWindowSystemState(topoConf, context)); +} + +// package access for unit tests +void prepare(Map topoConf, TopologyContext context, OutputCollector collector, + KeyValueState
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r128992641 --- Diff: storm-client/src/jvm/org/apache/storm/topology/IStatefulWindowedBolt.java --- @@ -15,12 +15,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.storm.topology; import org.apache.storm.state.State; /** - * A windowed bolt abstraction for supporting windowing operation with state + * A windowed bolt abstraction for supporting windowing operation with state. */ public interface IStatefulWindowedBolt extends IStatefulComponent, IWindowedBolt { +/** + * If the stateful windowed bolt should have its windows persisted in state and maintain a subset + * (recent events) in memory. --- End diff -- this maybe frequent events instead of recent events. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user satishd commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129804787 --- Diff: storm-client/src/jvm/org/apache/storm/topology/SimpleWindowPartitionCache.java --- @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple implementation that evicts the largest un-pinned entry from the cache. This works well + * for caching window partitions since the access pattern is mostly sequential scans. + */ +public class SimpleWindowPartitionCacheimplements WindowPartitionCache { +private static final Logger LOG = LoggerFactory.getLogger(SimpleWindowPartitionCache.class); + +private final ConcurrentSkipListMap map = new ConcurrentSkipListMap<>(); +private final Map pinned = new HashMap<>(); +private final long maximumSize; +private final RemovalListener removalListener; +private final CacheLoader cacheLoader; +private final ReentrantLock lock = new ReentrantLock(true); +private int size; + +@Override +public V get(K key) { +return getOrLoad(key, false); +} + +@Override +public V getPinned(K key) { +return getOrLoad(key, true); +} + +@Override +public boolean unpin(K key) { +LOG.debug("unpin '{}'", key); +boolean res = false; +try { +lock.lock(); +if (pinned.compute(key, (k, v) -> v == null ? 0 : v - 1) <= 0) { +pinned.remove(key); +res = true; +} +} finally { +lock.unlock(); +} +LOG.debug("pinned '{}'", pinned); +return res; +} + +@Override +public ConcurrentMap asMap() { +return map; +} + +@Override +public void invalidate(K key) { +try { +lock.lock(); +if (isPinned(key)) { +LOG.debug("Entry '{}' is pinned, skipping invalidation", key); +} else { +LOG.debug("Invalidating entry '{}'", key); +V val = map.remove(key); +if (val != null) { +--size; +pinned.remove(key); +removalListener.onRemoval(key, val, RemovalCause.EXPLICIT); +} +} +} finally { +lock.unlock(); +} +} + +// Get or load from the cache optionally pinning the entry +// so that it wont get evicted from the cache +private V getOrLoad(K key, boolean shouldPin) { +V val; +if (shouldPin) { +try { +lock.lock(); +val = load(key); +pin(key); +} finally { +lock.unlock(); +} +} else { +val = map.get(key); --- End diff -- Why are we checking here as this is already checked in `load(key)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Hello Roshan, Thank you for your detailed answer. Details are important, because in my organization, I am often asked to re-assess the reasons why we chose Storm over its competitors. Best regards, Alexandre Vermeerbergen 2017-07-25 23:36 GMT+02:00 roshannaik: > Github user roshannaik commented on the issue: > > https://github.com/apache/storm/pull/2241 > > @avermeer Looks like SuperChief blog is relaying the same basic claims > that Heron has marketed. Since you ask, i will share my opinions wrt > Heron's claims. > > - Heron has never been a player in the high performance club. They > have been smart about not comparing themselves with the real top performers > of the day. I only included them here because they have built they have > made much noise against Storm. They are smart about not mentioning which > version of Storm they are comparing with (how does a paper with such > critical info missing get accepted ?). That creates an illusion in people > that their perf claims apply to all versions of Storm in general... even if > Storm [publishes new perf numbers](hortonworks.com/blog/ > microbenchmarking-storm-1-0-performance/) comparing itself to a prior > version. > - Heron's threading model (1 thread per process.. based on what i > gather from their articles), is really primitive for this application > domain. I don't recommend it, but by setting 'topology.workers' equal to > the number of spout& bolt instances, Storm can be run in Heron mode. > - I find it much easier to debug a process with multiple components > using a debugger rather start a separate debugger for every instance of > spout bolt running. Also, I would imagine, having so many processes means > you have an explosion of log files to deal with when triaging. > - Unclear why the recovery model (when worker process crashes) is any > better ... the same kind of replay from the spout would be required. The > gains may be minor if any. Making minor optimizations to the failure path > and penalizing the normal operation path... is backwards. > - Cant get a stack from a Storm worker ? Thats clearly false. Try it > yourself. I do it all the time. Heapdumps, on the other hand, can stall the > worker and if the heap size is really large the supervisor might feel the > worker is having a problem. There are timeouts that you can increase to for > the supervisor to wait longer. I cant imagine that Heron doesn't monitor > their workers and restart them if they are not responsive. > - Heron's Backpressure model is simply too overweight, but marketed > as a novel idea. > - A quick read of their latest perf blog, noted in the comparison, and > it was evident that they missed recognizing their real perf problem. > > > > --- > If your project is set up for it, you can reply to this email and have your > reply appear on GitHub as well. If your project does not have this feature > enabled and wishes so, or if the feature is enabled but not working, please > contact infrastructure at infrastruct...@apache.org or file a JIRA ticket > with INFRA. > --- >
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user HeartSaVioR commented on the issue: https://github.com/apache/storm/pull/2241 @harshach For ThroughputvsLatency, throttling spout is intended. We set desired throughput and see histogram of latency and other metrics. (CPU, GC, etc.) There're 3 components in topology which parallelism would be set to 4 * worker count so total 12 executor threads per worker. I think we can parameterize the magic number 4 and adjust it while testing too. I have also done with some performance tests, without modifying TVL topology. The reason is that we should also care about non-performance-maximized topology. For benchmarking performance maximized topology we also have ConstSpoutIdBoltNullBoltTopo, so let's not modify TVL and verify this patch works with all the cases. Since this patch doesn't seem to handle inter-worker communication properly, the test set what we can do for now is very limited. Here's my machine spec used for performance test: ``` java version "1.8.0_131" Java(TM) SE Runtime Environment (build 1.8.0_131-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode) ``` ``` OS: Ubuntu 17.04 CPU: AMD Ryzen 5 1600 3.2Ghz 6 core (with hyper-thread = 12 logical cores) RAM: Samsung DDR4 32G 19200 SSD: Samsung 850 Evo ``` and here's my number (just pasted as raw number): https://docs.google.com/spreadsheets/d/1J3S4R68CsazlINF60rQn4XCy2Hx5QNzqVoWSO9qo2tc/edit?usp=sharing My observation is that this patch looks impressive with performance maximized topology, but this also looks really bad (not acceptable) with relatively idle topology. I've observed all the things what @revans2 observed with TVL tests. But this patch looks stable with ConstSpoutIdBoltNullBoltTopo and even CPU usage seems lower than stock in this test. While we often publicize micro-benchmark result, in practice users would run much idle topologies. I'm OK if things can be stabilized with adjusting parameters (if then I think default value should be here), but if not, it should be addressed before accepting the patch. I would be -1 if TVL result is not stable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2241: STORM-2306 : Messaging subsystem redesign.
Github user harshach commented on the issue: https://github.com/apache/storm/pull/2241 @revans2 @HeartSaVioR Here are my findings https://docs.google.com/spreadsheets/d/1wPpC3YXp-vTIelRTUVoLxuxIYxUekiIHUpZ2ZEysC4Y/edit#gid=1644511... 1. Looking at ThroughputvsLatency I found some issues: - By default it adds 51 total threads , that IMO is incorrect when benchmarking in a 4-core machine. - Also it adds two bolts for logging/measurements which might be impacting the numbers https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/... https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/... - It also throttles the spout https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/... I did the following changes: - Disable the HTTP and Logging bolts - Disable throttling spout, we want spout to run as fast as it can - reduced the executor counts If you see lines from 78 - 102. Apache Master clearly couldn't handle the faster spout and starts timing out. Perf degrades considerably and very quickly. Where as STORM-2306 not only was able to handle the faster spout and delivered stable and processing at more start out being 10x faster then improves to 35x faster compared to master. 2. Also ran storm-perf topologies ConstSpoutIdNullBoltIdTopo and ConstSpoutNullBoltTopo. These topologies are trying to see whats the message throughput and latency when there are only 2 components involved without including any external dependencies. Essentially testing the messaging system. From line 3-45 you can see with this patch we are getting under 10ms (depends on the topology) compare to an avg of 250ms+. (with batchSize=1) 3. Also ran storm-examples ThroughputVsLatency with 2 workers. Here there is clearly a bug which is prevent inter-worker communication so don't have comparative numbers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2218: STORM-2614: Enhance stateful windowing to persist the win...
Github user arunmahadevan commented on the issue: https://github.com/apache/storm/pull/2218 @srdo, addressed your comments, let me know if I missed something. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763588 --- Diff: storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java --- @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import org.apache.storm.Config; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.state.State; +import org.apache.storm.state.StateFactory; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.DefaultEvictionContext; +import org.apache.storm.windowing.Event; +import org.apache.storm.windowing.EventImpl; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.emptyIterator; +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader; +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause; +import static org.apache.storm.topology.WindowPartitionCache.RemovalListener; + +/** + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses state and the underlying + * checkpointing mechanisms to save the tuples in window to state. The tuples are also kept in-memory + * by transparently caching the window partitions and checkpointing them as needed. + */ +public class PersistentWindowedBoltExecutor extends WindowedBoltExecutor implements IStatefulBolt { +private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class); +private final IStatefulWindowedBolt statefulWindowedBolt; +private transient TopologyContext topologyContext; +private transient OutputCollector outputCollector; +private transient WindowState state; +private transient boolean stateInitialized; +private transient boolean prePrepared; + +public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) { +super(bolt); +statefulWindowedBolt = bolt; +} + +@Override +public void prepare(MaptopoConf, TopologyContext context, OutputCollector collector) { +List registrations = (List) topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>()); +registrations.add(ConcurrentLinkedQueue.class.getName()); +registrations.add(LinkedList.class.getName()); +registrations.add(AtomicInteger.class.getName()); +registrations.add(EventImpl.class.getName()); +registrations.add(WindowPartition.class.getName()); +registrations.add(DefaultEvictionContext.class.getName()); +topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations); +prepare(topoConf, context, collector, +getWindowState(topoConf, context), +getPartitionState(topoConf, context), +getWindowSystemState(topoConf, context)); +} + +// package access for unit tests +void prepare(Map topoConf, TopologyContext context, OutputCollector collector, + KeyValueState
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763692 --- Diff: storm-client/src/jvm/org/apache/storm/topology/SimpleWindowPartitionCache.java --- @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple implementation that evicts the largest un-pinned entry from the cache. This works well + * for caching window partitions since the access pattern is mostly sequential scans. + */ +public class SimpleWindowPartitionCacheimplements WindowPartitionCache { +private static final Logger LOG = LoggerFactory.getLogger(SimpleWindowPartitionCache.class); + +private final ConcurrentSkipListMap map = new ConcurrentSkipListMap<>(); +private final Map pinned = new HashMap<>(); +private final long maximumSize; +private final RemovalListener removalListener; +private final CacheLoader cacheLoader; +private final ReentrantLock lock = new ReentrantLock(true); +private int size; + +@Override +public V get(K key) { +return getOrLoad(key, false); +} + +@Override +public V getPinned(K key) { +return getOrLoad(key, true); +} + +@Override +public boolean unpin(K key) { +LOG.debug("unpin '{}'", key); +boolean res = false; +try { +lock.lock(); +if (pinned.compute(key, (k, v) -> v == null ? 0 : v - 1) <= 0) { +pinned.remove(key); +res = true; +} +} finally { +lock.unlock(); +} +LOG.debug("pinned '{}'", pinned); +return res; +} + +@Override +public ConcurrentMap asMap() { +return map; +} + +@Override +public void invalidate(K key) { +try { +lock.lock(); +if (isPinned(key)) { +LOG.debug("Entry '{}' is pinned, skipping invalidation", key); +} else { +LOG.debug("Invalidating entry '{}'", key); +V val = map.remove(key); +if (val != null) { +--size; +pinned.remove(key); +removalListener.onRemoval(key, val, RemovalCause.EXPLICIT); +} +} +} finally { +lock.unlock(); +} +} + +// Get or load from the cache optionally pinning the entry +// so that it wont get evicted from the cache +private V getOrLoad(K key, boolean shouldPin) { +V val; +if (shouldPin) { +try { +lock.lock(); +val = load(key); +pin(key); +} finally { +lock.unlock(); +} +} else { +val = map.get(key); +if (val == null) { +try { +lock.lock(); +val = load(key); +} finally { +lock.unlock(); +} +} +} + +return val; +} + +private V load(K key) { +V val = map.get(key); +if (val == null) { +val = cacheLoader.load(key); +if (val == null) { +throw new NullPointerException("Null value"); +} +
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763781 --- Diff: storm-client/src/jvm/org/apache/storm/windowing/TupleWindowIterImpl.java --- @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.windowing; + +import com.google.common.collect.Iterators; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.function.Supplier; +import org.apache.storm.tuple.Tuple; + +/** + * An iterator based implementation over the events in a window. + */ +public class TupleWindowIterImpl implements TupleWindow { +private final SuppliertuplesIt; +private final Supplier newTuplesIt; +private final Supplier expiredTuplesIt; +private final Long startTimestamp; +private final Long endTimestamp; + +public TupleWindowIterImpl(Supplier tuplesIt, + Supplier newTuplesIt, + Supplier expiredTuplesIt, + Long startTimestamp, Long endTimestamp) { +this.tuplesIt = tuplesIt; +this.newTuplesIt = newTuplesIt; +this.expiredTuplesIt = expiredTuplesIt; +this.startTimestamp = startTimestamp; +this.endTimestamp = endTimestamp; +} + +@Override +public List get() { +List tuples = new ArrayList<>(); +tuplesIt.get().forEachRemaining(t -> tuples.add(t)); +return tuples; +} + +@Override +public List getNew() { +throw new UnsupportedOperationException("Not implemented"); --- End diff -- it not straightforward to return the new and expired tuples iterators without persisting the tuples separately and maintaining the state, so for now its unsupported. The newTuplesIt and expiredTuplesIt receives null values for now --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763573 --- Diff: storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java --- @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import org.apache.storm.Config; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.state.State; +import org.apache.storm.state.StateFactory; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.DefaultEvictionContext; +import org.apache.storm.windowing.Event; +import org.apache.storm.windowing.EventImpl; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.emptyIterator; +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader; +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause; +import static org.apache.storm.topology.WindowPartitionCache.RemovalListener; + +/** + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses state and the underlying + * checkpointing mechanisms to save the tuples in window to state. The tuples are also kept in-memory + * by transparently caching the window partitions and checkpointing them as needed. + */ +public class PersistentWindowedBoltExecutor extends WindowedBoltExecutor implements IStatefulBolt { +private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class); +private final IStatefulWindowedBolt statefulWindowedBolt; +private transient TopologyContext topologyContext; +private transient OutputCollector outputCollector; +private transient WindowState state; +private transient boolean stateInitialized; +private transient boolean prePrepared; + +public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) { +super(bolt); +statefulWindowedBolt = bolt; +} + +@Override +public void prepare(MaptopoConf, TopologyContext context, OutputCollector collector) { +List registrations = (List) topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>()); +registrations.add(ConcurrentLinkedQueue.class.getName()); +registrations.add(LinkedList.class.getName()); +registrations.add(AtomicInteger.class.getName()); +registrations.add(EventImpl.class.getName()); +registrations.add(WindowPartition.class.getName()); +registrations.add(DefaultEvictionContext.class.getName()); +topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations); +prepare(topoConf, context, collector, +getWindowState(topoConf, context), +getPartitionState(topoConf, context), +getWindowSystemState(topoConf, context)); +} + +// package access for unit tests +void prepare(Map topoConf, TopologyContext context, OutputCollector collector, + KeyValueState
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763661 --- Diff: storm-client/src/jvm/org/apache/storm/topology/SimpleWindowPartitionCache.java --- @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple implementation that evicts the largest un-pinned entry from the cache. This works well + * for caching window partitions since the access pattern is mostly sequential scans. + */ +public class SimpleWindowPartitionCacheimplements WindowPartitionCache { +private static final Logger LOG = LoggerFactory.getLogger(SimpleWindowPartitionCache.class); + +private final ConcurrentSkipListMap map = new ConcurrentSkipListMap<>(); +private final Map pinned = new HashMap<>(); +private final long maximumSize; +private final RemovalListener removalListener; +private final CacheLoader cacheLoader; +private final ReentrantLock lock = new ReentrantLock(true); +private int size; + +@Override +public V get(K key) { +return getOrLoad(key, false); +} + +@Override +public V getPinned(K key) { +return getOrLoad(key, true); +} + +@Override +public boolean unpin(K key) { +LOG.debug("unpin '{}'", key); +boolean res = false; +try { +lock.lock(); +if (pinned.compute(key, (k, v) -> v == null ? 0 : v - 1) <= 0) { +pinned.remove(key); +res = true; +} +} finally { +lock.unlock(); +} +LOG.debug("pinned '{}'", pinned); +return res; +} + +@Override +public ConcurrentMap asMap() { +return map; +} + +@Override +public void invalidate(K key) { +try { +lock.lock(); +if (isPinned(key)) { +LOG.debug("Entry '{}' is pinned, skipping invalidation", key); +} else { +LOG.debug("Invalidating entry '{}'", key); +V val = map.remove(key); +if (val != null) { +--size; +pinned.remove(key); +removalListener.onRemoval(key, val, RemovalCause.EXPLICIT); +} +} +} finally { +lock.unlock(); +} +} + +// Get or load from the cache optionally pinning the entry +// so that it wont get evicted from the cache +private V getOrLoad(K key, boolean shouldPin) { +V val; +if (shouldPin) { +try { +lock.lock(); +val = load(key); +pin(key); +} finally { +lock.unlock(); +} +} else { +val = map.get(key); +if (val == null) { +try { +lock.lock(); +val = load(key); +} finally { +lock.unlock(); +} +} +} + +return val; +} + +private V load(K key) { +V val = map.get(key); +if (val == null) { +val = cacheLoader.load(key); +if (val == null) { +throw new NullPointerException("Null value"); --- End diff --
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763759 --- Diff: storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java --- @@ -127,26 +154,30 @@ private void validate(MaptopoConf, Count windowLengthCount, Dur throw new IllegalArgumentException("Window length is not specified"); } -if (windowLengthDuration != null && slidingIntervalDuration != null) { -ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout); -} else if (windowLengthDuration != null) { -ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout); -} else if (slidingIntervalDuration != null) { -ensureDurationLessThanTimeout(slidingIntervalDuration.value, topologyTimeout); -} +if (isPersistent()) { + ensureCheckpointIntervalLessThanTimeout(getCheckpointIntervalSecs(topoConf), topologyTimeout); --- End diff -- we don't need to validate `message timeout > window length + sliding interval` here since the tuples are acked during checkpoint. Hence ensuring the `timeout > checkpoint interval` would suffice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763707 --- Diff: storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java --- @@ -102,6 +109,18 @@ private int getMaxSpoutPending(MaptopoConf) { return maxPending; } +private int getCheckpointIntervalSecs(Map topoConf) { --- End diff -- it can be refactored --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763468 --- Diff: storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java --- @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import org.apache.storm.Config; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.state.State; +import org.apache.storm.state.StateFactory; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.DefaultEvictionContext; +import org.apache.storm.windowing.Event; +import org.apache.storm.windowing.EventImpl; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.emptyIterator; +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader; +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause; +import static org.apache.storm.topology.WindowPartitionCache.RemovalListener; + +/** + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses state and the underlying + * checkpointing mechanisms to save the tuples in window to state. The tuples are also kept in-memory + * by transparently caching the window partitions and checkpointing them as needed. + */ +public class PersistentWindowedBoltExecutor extends WindowedBoltExecutor implements IStatefulBolt { +private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class); +private final IStatefulWindowedBolt statefulWindowedBolt; +private transient TopologyContext topologyContext; +private transient OutputCollector outputCollector; +private transient WindowState state; +private transient boolean stateInitialized; +private transient boolean prePrepared; + +public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) { +super(bolt); +statefulWindowedBolt = bolt; +} + +@Override +public void prepare(MaptopoConf, TopologyContext context, OutputCollector collector) { +List registrations = (List) topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>()); +registrations.add(ConcurrentLinkedQueue.class.getName()); +registrations.add(LinkedList.class.getName()); +registrations.add(AtomicInteger.class.getName()); +registrations.add(EventImpl.class.getName()); +registrations.add(WindowPartition.class.getName()); +registrations.add(DefaultEvictionContext.class.getName()); +topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations); +prepare(topoConf, context, collector, +getWindowState(topoConf, context), +getPartitionState(topoConf, context), +getWindowSystemState(topoConf, context)); +} + +// package access for unit tests +void prepare(Map topoConf, TopologyContext context, OutputCollector collector, + KeyValueState
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763520 --- Diff: storm-client/src/jvm/org/apache/storm/topology/PersistentWindowedBoltExecutor.java --- @@ -0,0 +1,563 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.topology; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +import org.apache.storm.Config; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.state.State; +import org.apache.storm.state.StateFactory; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.DefaultEvictionContext; +import org.apache.storm.windowing.Event; +import org.apache.storm.windowing.EventImpl; +import org.apache.storm.windowing.WindowLifecycleListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Collections.emptyIterator; +import static org.apache.storm.topology.WindowPartitionCache.CacheLoader; +import static org.apache.storm.topology.WindowPartitionCache.RemovalCause; +import static org.apache.storm.topology.WindowPartitionCache.RemovalListener; + +/** + * Wraps a {@link IStatefulWindowedBolt} and handles the execution. Uses state and the underlying + * checkpointing mechanisms to save the tuples in window to state. The tuples are also kept in-memory + * by transparently caching the window partitions and checkpointing them as needed. + */ +public class PersistentWindowedBoltExecutor extends WindowedBoltExecutor implements IStatefulBolt { +private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowedBoltExecutor.class); +private final IStatefulWindowedBolt statefulWindowedBolt; +private transient TopologyContext topologyContext; +private transient OutputCollector outputCollector; +private transient WindowState state; +private transient boolean stateInitialized; +private transient boolean prePrepared; + +public PersistentWindowedBoltExecutor(IStatefulWindowedBolt bolt) { +super(bolt); +statefulWindowedBolt = bolt; +} + +@Override +public void prepare(MaptopoConf, TopologyContext context, OutputCollector collector) { +List registrations = (List) topoConf.getOrDefault(Config.TOPOLOGY_STATE_KRYO_REGISTER, new ArrayList<>()); +registrations.add(ConcurrentLinkedQueue.class.getName()); +registrations.add(LinkedList.class.getName()); +registrations.add(AtomicInteger.class.getName()); +registrations.add(EventImpl.class.getName()); +registrations.add(WindowPartition.class.getName()); +registrations.add(DefaultEvictionContext.class.getName()); +topoConf.put(Config.TOPOLOGY_STATE_KRYO_REGISTER, registrations); +prepare(topoConf, context, collector, +getWindowState(topoConf, context), +getPartitionState(topoConf, context), +getWindowSystemState(topoConf, context)); +} + +// package access for unit tests +void prepare(Map topoConf, TopologyContext context, OutputCollector collector, + KeyValueState
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763418 --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/PersistentWindowingTopology.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.starter; + +import static org.apache.storm.topology.base.BaseWindowedBolt.Duration; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.Config; +import org.apache.storm.StormSubmitter; +import org.apache.storm.starter.spout.RandomIntegerSpout; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.streams.Pair; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseStatefulWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} with window + * persistence. + * + * The framework automatically checkpoints the tuples in window along with the bolt's state and restores the same + * during restarts. + * + * + * + * This topology uses 'redis' for state persistence, so you should also start a redis instance before deploying. + * If you are running in local mode you can just start a redis server locally which will be used for storing the state. The default + * RedisKeyValueStateProvider parameters can be overridden by setting "topology.state.provider.config", for e.g. + * + * { + * "jedisPoolConfig": { + * "host": "redis-server-host", + * "port": 6379, + * "timeout": 2000, + * "database": 0, + * "password": "xyz" + * } + * } + * + * + */ +public class PersistentWindowingTopology { +private static final Logger LOG = LoggerFactory.getLogger(PersistentWindowingTopology.class); + +// wrapper to hold global and window averages +private static class Averages { +private final double global; +private final double window; + +Averages(double global, double window) { +this.global = global; +this.window = window; +} + +@Override +public String toString() { +return "Averages{" + "global=" + String.format("%.2f", global) + ", window=" + String.format("%.2f", window) + '}'; +} +} + +/** + * A bolt that uses stateful persistence to store the windows along with the state (global avg). + */ +private static class AvgBolt extends BaseStatefulWindowedBolt>> { +private static final String STATE_KEY = "avg"; + +private OutputCollector collector; +private KeyValueState > state; +private Pair globalAvg; + +@Override +public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) { +this.collector = collector; +} + +@Override +public void initState(KeyValueState > state) { +this.state = state; +globalAvg = state.get(STATE_KEY, Pair.of(0L, 0L)); +LOG.info("initState with global avg [" + (double) globalAvg._1 / globalAvg._2 + "]"); --- End diff -- _1 and _2 are exposed to be consistent with other `TupleN` classes. Will use the getters here. --- If
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763373 --- Diff: docs/Windowing.md --- @@ -266,3 +266,105 @@ tuples can be received within the timeout period. An example toplogy `SlidingWindowTopology` shows how to use the apis to compute a sliding window sum and a tumbling window average. +## Stateful windowing +The default windowing implementation in storm stores the tuples in memory until they are processed and expired from the +window. This limits the use cases to windows that +fit entirely in memory. Also the source tuples cannot be ack-ed until the window expiry requiring large message timeouts +(topology.message.timeout.secs should be larger than the window length + sliding interval). This also puts extra loads +due to the complex acking and anchoring requirements. + +To address the above limitations and to support larger window sizes, storm provides stateful windowing support via `IStatefulWindowedBolt`. +User bolts should typically extend `BaseStatefulWindowedBolt` for the windowing operations with the framework automatically +managing the state of the window in the background. + +If the sources provide a monotonically increasing identifier as a part of the message, the framework can use this +to periodically checkpoint the last expired and evaluated message ids, to avoid duplicate window evaluations in case of +failures or restarts. During recovery, the tuples with message ids lower than last expired id are discarded and tuples with +message id between the last expired and last evaluated message ids are fed into the system without activating any triggers. +The tuples beyond the last evaluated message ids are processed as usual. This can be enabled by setting +the `messageIdField` as shown below, + +```java +topologyBuilder.setBolt("mybolt", + new MyStatefulWindowedBolt() + .withWindow(...) // windowing configuarations + .withMessageIdField("msgid"), // a monotonically increasing 'long' field in the tuple + parallelism) + .shuffleGrouping("spout"); +``` + +However, this option is feasible only if the sources can provide a monotonically increasing identifier in the tuple and the same is maintained +while re-emitting the messages in case of failures. Here the tuples in window are still held in memory. + +For more details take a look at the sample topology in storm starter `StatefulWindowingTopology` which will help you get started. --- End diff -- yes, will link to the file --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763340 --- Diff: docs/Windowing.md --- @@ -266,3 +266,105 @@ tuples can be received within the timeout period. An example toplogy `SlidingWindowTopology` shows how to use the apis to compute a sliding window sum and a tumbling window average. +## Stateful windowing +The default windowing implementation in storm stores the tuples in memory until they are processed and expired from the +window. This limits the use cases to windows that +fit entirely in memory. Also the source tuples cannot be ack-ed until the window expiry requiring large message timeouts +(topology.message.timeout.secs should be larger than the window length + sliding interval). This also puts extra loads +due to the complex acking and anchoring requirements. + +To address the above limitations and to support larger window sizes, storm provides stateful windowing support via `IStatefulWindowedBolt`. +User bolts should typically extend `BaseStatefulWindowedBolt` for the windowing operations with the framework automatically +managing the state of the window in the background. + +If the sources provide a monotonically increasing identifier as a part of the message, the framework can use this +to periodically checkpoint the last expired and evaluated message ids, to avoid duplicate window evaluations in case of +failures or restarts. During recovery, the tuples with message ids lower than last expired id are discarded and tuples with +message id between the last expired and last evaluated message ids are fed into the system without activating any triggers. --- End diff -- yes its the trigger policy instances. will reword. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2218: STORM-2614: Enhance stateful windowing to persist ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/2218#discussion_r129763359 --- Diff: docs/Windowing.md --- @@ -266,3 +266,105 @@ tuples can be received within the timeout period. An example toplogy `SlidingWindowTopology` shows how to use the apis to compute a sliding window sum and a tumbling window average. +## Stateful windowing +The default windowing implementation in storm stores the tuples in memory until they are processed and expired from the +window. This limits the use cases to windows that +fit entirely in memory. Also the source tuples cannot be ack-ed until the window expiry requiring large message timeouts +(topology.message.timeout.secs should be larger than the window length + sliding interval). This also puts extra loads +due to the complex acking and anchoring requirements. + +To address the above limitations and to support larger window sizes, storm provides stateful windowing support via `IStatefulWindowedBolt`. +User bolts should typically extend `BaseStatefulWindowedBolt` for the windowing operations with the framework automatically +managing the state of the window in the background. + +If the sources provide a monotonically increasing identifier as a part of the message, the framework can use this +to periodically checkpoint the last expired and evaluated message ids, to avoid duplicate window evaluations in case of +failures or restarts. During recovery, the tuples with message ids lower than last expired id are discarded and tuples with +message id between the last expired and last evaluated message ids are fed into the system without activating any triggers. +The tuples beyond the last evaluated message ids are processed as usual. This can be enabled by setting +the `messageIdField` as shown below, + +```java +topologyBuilder.setBolt("mybolt", + new MyStatefulWindowedBolt() + .withWindow(...) // windowing configuarations + .withMessageIdField("msgid"), // a monotonically increasing 'long' field in the tuple + parallelism) + .shuffleGrouping("spout"); +``` + +However, this option is feasible only if the sources can provide a monotonically increasing identifier in the tuple and the same is maintained +while re-emitting the messages in case of failures. Here the tuples in window are still held in memory. --- End diff -- will reword: With this option the tuples are still buffered in memory until processed and expired from the window. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---