[jira] [Reopened] (STORM-1347) ui changes to display the topology version.
[ https://issues.apache.org/jira/browse/STORM-1347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt reopened STORM-1347: - Accidentally closed. > ui changes to display the topology version. > > > Key: STORM-1347 > URL: https://issues.apache.org/jira/browse/STORM-1347 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core > Reporter: Parth Brahmbhatt >Assignee: Parth Brahmbhatt > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (STORM-1347) ui changes to display the topology version.
[ https://issues.apache.org/jira/browse/STORM-1347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt resolved STORM-1347. - Resolution: Fixed > ui changes to display the topology version. > > > Key: STORM-1347 > URL: https://issues.apache.org/jira/browse/STORM-1347 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core > Reporter: Parth Brahmbhatt >Assignee: Parth Brahmbhatt > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: use Put#addColumn to replace the deprecated Pu...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/1353#issuecomment-213607240 @lujinhong Can you file a jira to make the move. I think I have a patch lying around in my repo which brings some perf improvements in addition to moving the code to newer APIs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1030. Hive Connector Fixes.
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/871#issuecomment-200564783 This PR has been open for a long time, i am still +1 and will merge this this weekend if no one objects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1604:Delayed transition should handle No...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/1188#issuecomment-199499535 @harshach fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1604:Delayed transition should handle No...
GitHub user Parth-Brahmbhatt opened a pull request: https://github.com/apache/storm/pull/1188 STORM-1604:Delayed transition should handle NotALeaderException You can merge this pull request into a Git repository by running: $ git pull https://github.com/Parth-Brahmbhatt/incubator-storm STORM-1604 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1188.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1188 commit b95fcd79cb1c2225cd0816f4e2b6eaaf7a2ffd13 Author: Parth Brahmbhatt <brahmbhatt.pa...@gmail.com> Date: 2016-03-05T00:09:09Z STORM-1604:Delayed transition should handle NotALeaderException --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (STORM-1604) Delayed transition should handle NotALeaderException
Parth Brahmbhatt created STORM-1604: --- Summary: Delayed transition should handle NotALeaderException Key: STORM-1604 URL: https://issues.apache.org/jira/browse/STORM-1604 Project: Apache Storm Issue Type: Bug Components: storm-core Affects Versions: 1.0.0 Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Fix For: 1.0.0 Currently if an action(kill, rebalance) is scheduled with delay, nimbus stores the state in zookeeper and then schedules a delayed event to do final transition. If during this wait time, leader nimbus loses the leadership, when the delayed operation is executed it receives a NotALeaderException which it does not handle causing the nimbus to die. We should catch the exception and ignore it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-1569: Adding option in nimbus to specify...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/1144#issuecomment-188447027 @knusbaum I think we should. WE have see RejectedExecutionException on > 5 different customer clusters. I can backport this if others also agree. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Fix Log4j2.xml config to output the the timest...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/1145#issuecomment-187957776 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [DISCUSS] Java REST Framework adoption
+1 on DropWizard. On 2/23/16, 2:02 PM, "Harsha"wrote: >-1 on spring boot or anything related to spring. >This api is intended to be very simple powering UI and any rest clients >interested in grabbing the metrics from the same api as UI does. > >Jersey is good and dropwizard (http://www.dropwizard.io/0.9.2/docs/) >has been a way to go for java REST api offlate. Underneath it uses >jersey and one can run jetty server as well which is what we've as the >UI and logviewer server. > > >-Harsha > >On Tue, Feb 23, 2016, at 08:23 AM, Ravi Sharma wrote: >> spring boot + >> >> Ravi >> >> On Tue, Feb 23, 2016 at 3:16 PM, Ankur Garg >>wrote: >> >> > How about using Spring Boot & Jersey for writing this . Spring Boot >>will >> > give us packaged jar which once executed will bring up its own >>embedded >> > server (Jetty or Tomcat or some other ) . Although Spring Boot has >>some >> > disadvantages as well , but worth investigating this option too . >> > >> > >> > Any thoughts?? >> > >> > Thanks >> > Ankur >> > >> > On Tue, Feb 23, 2016 at 8:20 PM, Bobby Evans >> >> > wrote: >> > >> > > Yes, we need to pick something. I have used Jersey in the past and >>I >> > > think it is fairly decent. I have never used RESTEasy, but it is >>more or >> > > less the same API, so either one is fine with me, but Jersey is my >>vote >> > > just because of experience. >> > > >> > > You should keep in mind that we are currently on a very old version >>of >> > > jetty, and I am not sure if newer libraries will work with it. But >>also >> > > the old versions of ring and hiccup that we use don't support newer >>jetty >> > > versions either. >> > > >> > > I personally think that now would be a good time to separate out >>the UI >> > > into a separate package + classpath. This would allow us to >>package the >> > UI >> > > as both a war with embedded jetty as a default option to run it; >>start >> > from >> > > scratch with up to date versions of Jetty, Jersey/RESTEasy, and >>JAXB; and >> > > upgrade the different servers/components one at a time instead of >>all at >> > > once. The DRPC server also uses the embedded jetty and exposes a >>REST >> > > interface, and that is going to be a harder one to tease out so it >>should >> > > probably be the last one to go. >> > > - Bobby >> > > >> > > On Tuesday, February 23, 2016 3:40 AM, 伍翀(云邪) < >> > > wuchong...@alibaba-inc.com> wrote: >> > > >> > > >> > > Hi all, I’m planning to move UI/REST service and logviewer to Java, >> > which >> > > means that we need to pick some alternatives for ring and hiccup. >> > > So the first thing is to pick up a REST framework. >> > > For the REST APIs, I think Jersey is a good choice (RESTEasy is fine >> > too). >> > > It’s easy to develop and good performance. >> > > Now logviewer use hiccup to return HTML we build ourselves, but >>it’s hard >> > > to debug and maintain. So in my opinion, it’s better to replace it >>with >> > > static HTML + REST like regular UI. >> > > Please let me know what you think. >> > > – Jark Wu >> > > >> > > >> > > >> > >
[GitHub] storm pull request: STORM-1569: Adding option in nimbus to specify...
GitHub user Parth-Brahmbhatt opened a pull request: https://github.com/apache/storm/pull/1144 STORM-1569: Adding option in nimbus to specify request queue size in ⦠â¦config. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Parth-Brahmbhatt/incubator-storm STORM-1569 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1144.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1144 commit 24a87af2c4a1aa6f03c0ccdabccde8ecb322bd99 Author: Parth Brahmbhatt <brahmbhatt.pa...@gmail.com> Date: 2016-02-23T18:28:05Z STORM-1569: Adding option in nimbus to specify request queue size in config. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (STORM-1569) Allowing users to specify the nimbus thrift server queue size.
Parth Brahmbhatt created STORM-1569: --- Summary: Allowing users to specify the nimbus thrift server queue size. Key: STORM-1569 URL: https://issues.apache.org/jira/browse/STORM-1569 Project: Apache Storm Issue Type: Improvement Components: storm-core Affects Versions: 0.10.0 Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Fix For: 1.0.0 Currently the nimbus sever in secure mode uses https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html Backed by https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/SynchronousQueue.html, Please see https://github.com/apache/thrift/blob/0.9.2/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java#L132. This means that if all executor threads are busy serving a request and new requests come in we will see RejectedExecutionExceptions in logs once they have reached the retry limit. Instead we should allow the requests to be queued. This patch allows the requests to be queued by replacing SynchronousQueue with https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ArrayBlockingQueue.html with default size of 10 requests which should be large enough for most applications. Applications can modify this default by adding the config nimbus.queue.size to their storm.yaml and bouncing nimbus. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (STORM-1147) Storm JDBCBolt should add validation to ensure either insertQuery or table name is specified and not both.
[ https://issues.apache.org/jira/browse/STORM-1147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt resolved STORM-1147. - Resolution: Fixed > Storm JDBCBolt should add validation to ensure either insertQuery or table > name is specified and not both. > -- > > Key: STORM-1147 > URL: https://issues.apache.org/jira/browse/STORM-1147 > Project: Apache Storm > Issue Type: Bug > Components: storm-jdbc >Affects Versions: 0.10.0 > Reporter: Parth Brahmbhatt > Assignee: Parth Brahmbhatt >Priority: Trivial > Fix For: 1.0.0 > > > The JDBCBolt takes either an insert query or table name but does not do any > validation check to ensure only one of the two option is provided. We should > add a validation check and throw an exception with proper messaging to avoid > confusion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-1539 - Improve Storm ACK-ing performance
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/1101#issuecomment-183113689 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1521] When using Kerberos login from ke...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/1064#issuecomment-182478517 Not sure how i missed that you were creating a singleton :-). +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1521] When using Kerberos login from ke...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/1064#issuecomment-182469322 @dbahir Does the legacy provider take care of logging in only once internally? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (STORM-1521) When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed
[ https://issues.apache.org/jira/browse/STORM-1521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt resolved STORM-1521. - Resolution: Fixed Fix Version/s: 2.0.0 > When using Kerberos login from keytab with multiple bolts/executors ticket is > not renewed > - > > Key: STORM-1521 > URL: https://issues.apache.org/jira/browse/STORM-1521 > Project: Apache Storm > Issue Type: Bug > Components: storm-hbase >Affects Versions: 0.10.0, 0.9.5 >Reporter: Dan Bahir >Assignee: Dan Bahir > Fix For: 2.0.0 > > > When logging in with a keytab, if the topology has more than one instance of > an HBase bolt then the ticket will not be automatically renewed. > Expected: The ticket will be automatically renewed and the bolt will be able > to write to the database. > Actual: The ticket is not renewed and the bolt loses access to HBase. > Note when there is only one bolt with one executor is renews correctly. > Exception in bolt is: > 2015-12-18T09:41:13.862-0500 o.a.h.s.UserGroupInformation [ERROR] > PriviledgedActionException as:u...@somewhere.com > cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Failed to find > any > Kerberos tgt)] > 2015-12-18T09:41:13.862-0500 o.a.h.i.RpcClient [WARN] Exception encountered > while connecting to the server : javax.security.sasl.SaslException: GSS > initiate > failed [Caused by GSSException: No valid credentials provided (Mechanism > level: > Failed to find any Kerberos tgt)] > 2015-12-18T09:41:13.863-0500 o.a.h.i.RpcClient [ERROR] SASL authentication > failed. The most likely cause is missing or invalid credentials. Consider > 'kinit'. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-1406: Add MQTT Support
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/991#issuecomment-173352336 I think you need to add this module to incubator-storm/storm-dist/binary/src/main/assembly/binary.xml . I am +1 once that is done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1406: Add MQTT Support
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/991#issuecomment-173358421 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1449] Fix Kafka spout to maintain backw...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/994#issuecomment-173027654 We can either just include this as a warning as part of Release Notes or we can roll this change back. I am fine with including just a Release Note to warn all users trying to upgrade unless others thing Rolling upgradability is important enough that we only brake it when people move between major versions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1176] Checkpoint window evaluated/expir...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/963#issuecomment-172122588 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1199 : HDFS Spout Functionally complete....
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/936#issuecomment-171777228 @roshannaik Thanks for the patch. I have merged this request to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (STORM-1199) Create HDFS Spout
[ https://issues.apache.org/jira/browse/STORM-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt resolved STORM-1199. - Resolution: Fixed Fix Version/s: 1.0.0 > Create HDFS Spout > - > > Key: STORM-1199 > URL: https://issues.apache.org/jira/browse/STORM-1199 > Project: Apache Storm > Issue Type: New Feature >Reporter: Roshan Naik >Assignee: Roshan Naik > Fix For: 1.0.0 > > Attachments: HDFSSpoutforStorm v2.pdf, HDFSSpoutforStorm.pdf, > hdfs-spout.1.patch > > > Create an HDFS spout so that Storm can suck in data from files in a HDFS > directory -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: [STORM-1175] State store for windowing operati...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/939#issuecomment-170755274 @arunmahadevan I am still +1 but the up merge is failing for storm-core/src/jvm/org/apache/storm/topology/TopologyBuilder.java can you please upmerge the last time and I can merge this into master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (STORM-1423) storm UI in a secure env shows error even when credentials are present
Parth Brahmbhatt created STORM-1423: --- Summary: storm UI in a secure env shows error even when credentials are present Key: STORM-1423 URL: https://issues.apache.org/jira/browse/STORM-1423 Project: Apache Storm Issue Type: Bug Components: storm-core Affects Versions: 0.10.1 Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Fix For: 0.11.0 storm UI in a secure env shows error even when credentials are present -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-1423: storm UI in a secure env shows err...
GitHub user Parth-Brahmbhatt opened a pull request: https://github.com/apache/storm/pull/980 STORM-1423: storm UI in a secure env shows error even when credential⦠â¦s are present. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Parth-Brahmbhatt/incubator-storm STORM-1423 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/980.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #980 commit 2199ee89de21899ef7db31d2f4e88aad60a53843 Author: Parth Brahmbhatt <brahmbhatt.pa...@gmail.com> Date: 2015-12-16T00:05:04Z STORM-1423: storm UI in a secure env shows error even when credentials are present. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Storm 631: refactoring kafka connector code.
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/387#issuecomment-167640073 closing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: 修改jdbcClient
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/724#issuecomment-167639763 What jira is this associated with? I see some ip addresses in pom and even looking at the change I cant understand what is this intending to do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Storm 631: refactoring kafka connector code.
Github user Parth-Brahmbhatt closed the pull request at: https://github.com/apache/storm/pull/387 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1175] State store for windowing operati...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/939#issuecomment-166698659 @arunmahadevan Thanks for the patch. I think the missing piece is the how-to-use guide. you can file a follow up jira for that. Overall looks good to me and once the comments are addressed , I am +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1175] State store for windowing operati...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/939#discussion_r48282204 --- Diff: storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java --- @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.spout; + +import backtype.storm.Config; +import backtype.storm.state.KeyValueState; +import backtype.storm.state.StateFactory; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import static backtype.storm.spout.CheckPointState.State.COMMITTED; +import static backtype.storm.spout.CheckPointState.State.COMMITTING; +import static backtype.storm.spout.CheckPointState.State.PREPARING; + +/** + * Emits checkpoint tuples which is used to save the state of the {@link backtype.storm.topology.IStatefulComponent} + * across the topology. If a topology contains Stateful bolts, Checkpoint spouts are automatically added + * to the topology. There is only one Checkpoint task per topology. + * + * Checkpoint spout stores its internal state in a {@link KeyValueState}. The state transitions are as follows. + * + * + * ROLLBACK(tx2) + * <- PREPARE(tx2) COMMIT(tx2) + * COMMITTED(tx1)-> PREPARING(tx2) --> COMMITTING(tx2) -> COMMITTED (tx2) + * + * + * + */ +public class CheckpointSpout extends BaseRichSpout { +private static final Logger LOG = LoggerFactory.getLogger(CheckpointSpout.class); + +public static final String CHECKPOINT_STREAM_ID = "$checkpoint"; +public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout"; +public static final String CHECKPOINT_FIELD_TXID = "txid"; +public static final String CHECKPOINT_FIELD_ACTION = "action"; +public static final String CHECKPOINT_ACTION_PREPARE = "prepare"; +public static final String CHECKPOINT_ACTION_COMMIT = "commit"; +public static final String CHECKPOINT_ACTION_ROLLBACK = "rollback"; +public static final String CHECKPOINT_ACTION_INITSTATE = "initstate"; + +private static final String TX_STATE_KEY = "__state"; +private static final int DEFAULT_CHECKPOINT_INTERVAL = 1000; // every sec + +private TopologyContext context; +private SpoutOutputCollector collector; +private long lastCheckpointTs; +private int checkpointInterval; +private boolean recoveryStepInProgress; +private boolean checkpointStepInProgress; +private boolean recovering; +private KeyValueState<String, CheckPointState> checkpointState; + +@Override +public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { +open(context, collector, loadCheckpointInterval(conf), loadCheckpointState(conf, context)); +} + +// package access for unit test +void open(TopologyContext context, SpoutOutputCollector collector, + int checkpointInterval, KeyValueState<String, CheckPointState> checkpointState) { +this.context = context; +this.collector = collector; +this.checkpointInterval = checkpointInterval; +this.checkpointState = checkpointState; +lastCheckpointTs = 0; +recoveryStepInProgress = false; +checkpointStepInProgress = false; +recovering = true; +} + +@Override +public void nextTuple() { +
[GitHub] storm pull request: [STORM-1175] State store for windowing operati...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/939#discussion_r48279308 --- Diff: storm-core/src/jvm/backtype/storm/spout/CheckPointState.java --- @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.spout; + +/** + * Captures the current state of the transaction in + * {@link CheckpointSpout} + */ +public class CheckPointState { +public long txid; --- End diff -- shouldn't these be private fields with getters? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1175] State store for windowing operati...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/939#discussion_r48280341 --- Diff: storm-core/src/jvm/backtype/storm/spout/CheckpointSpout.java --- @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.spout; + +import backtype.storm.Config; +import backtype.storm.state.KeyValueState; +import backtype.storm.state.StateFactory; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import static backtype.storm.spout.CheckPointState.State.COMMITTED; +import static backtype.storm.spout.CheckPointState.State.COMMITTING; +import static backtype.storm.spout.CheckPointState.State.PREPARING; + +/** + * Emits checkpoint tuples which is used to save the state of the {@link backtype.storm.topology.IStatefulComponent} + * across the topology. If a topology contains Stateful bolts, Checkpoint spouts are automatically added + * to the topology. There is only one Checkpoint task per topology. + * + * Checkpoint spout stores its internal state in a {@link KeyValueState}. The state transitions are as follows. + * + * + * ROLLBACK(tx2) + * <- PREPARE(tx2) COMMIT(tx2) + * COMMITTED(tx1)-> PREPARING(tx2) --> COMMITTING(tx2) -> COMMITTED (tx2) + * + * + * + */ +public class CheckpointSpout extends BaseRichSpout { +private static final Logger LOG = LoggerFactory.getLogger(CheckpointSpout.class); + +public static final String CHECKPOINT_STREAM_ID = "$checkpoint"; +public static final String CHECKPOINT_COMPONENT_ID = "$checkpointspout"; +public static final String CHECKPOINT_FIELD_TXID = "txid"; +public static final String CHECKPOINT_FIELD_ACTION = "action"; +public static final String CHECKPOINT_ACTION_PREPARE = "prepare"; +public static final String CHECKPOINT_ACTION_COMMIT = "commit"; +public static final String CHECKPOINT_ACTION_ROLLBACK = "rollback"; +public static final String CHECKPOINT_ACTION_INITSTATE = "initstate"; + +private static final String TX_STATE_KEY = "__state"; +private static final int DEFAULT_CHECKPOINT_INTERVAL = 1000; // every sec + +private TopologyContext context; +private SpoutOutputCollector collector; +private long lastCheckpointTs; +private int checkpointInterval; +private boolean recoveryStepInProgress; +private boolean checkpointStepInProgress; +private boolean recovering; +private KeyValueState<String, CheckPointState> checkpointState; + +@Override +public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { +open(context, collector, loadCheckpointInterval(conf), loadCheckpointState(conf, context)); +} + +// package access for unit test +void open(TopologyContext context, SpoutOutputCollector collector, + int checkpointInterval, KeyValueState<String, CheckPointState> checkpointState) { +this.context = context; +this.collector = collector; +this.checkpointInterval = checkpointInterval; +this.checkpointState = checkpointState; +lastCheckpointTs = 0; +recoveryStepInProgress = false; +checkpointStepInProgress = false; +recovering = true; +} + +@Override +public void nextTuple() { +
[GitHub] storm pull request: STORM-1187 Support windowing based on tuple ts
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/900#issuecomment-164565516 Overall I am +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1187 Support windowing based on tuple ts
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/900#discussion_r47557478 --- Diff: docs/documentation/Windowing.md --- @@ -126,6 +126,96 @@ Time duration based tumbling window that tumbles after the specified time durati ``` +## Tuple timestamp and out of order tuples +By default the timestamp tracked in the window is the time when the tuple is processed by the bolt. The window calculations +are performed based on the processing timestamp. Storm has support for tracking windows based on the source generated timestamp. + +```java +/** +* Specify a field in the tuple that represents the timestamp as a long value. If this +* field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown. +* +* @param fieldName the name of the field that contains the timestamp +*/ +public BaseWindowedBolt withTimestampField(String fieldName) +``` + +The value for the above `fieldName` will be looked up from the incoming tuple and considered for windowing calculations. +If the field is not present in the tuple an exception will be thrown. Along with the timestamp field name, a time lag parameter +can also be specified which indicates the max time limit for tuples with out of order timestamps. + +E.g. If the lag is 5 secs and a tuple `t1` arrived with timestamp `06:00:05` no tuples may arrive with tuple timestamp earlier than `06:00:00`. If a tuple +arrives with timestamp 05:59:59 after `t1` and the window has moved past `t1`, it will be treated as a late tuple and not processed. --- End diff -- Lets also document how users can find out number of discarded tuples? In many cases it may also be useful to provide a handler for tuples being discarded but I am fine with not including that in this patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1179: Create Maven Profiles for Integrat...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/930#issuecomment-164586572 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1383] Avoid supervisor crashing if nimb...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/938#issuecomment-163715625 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (STORM-1381) Client side topology submission hook.
Parth Brahmbhatt created STORM-1381: --- Summary: Client side topology submission hook. Key: STORM-1381 URL: https://issues.apache.org/jira/browse/STORM-1381 Project: Apache Storm Issue Type: New Feature Components: storm-core Affects Versions: 0.11.0 Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Priority: Trivial Fix For: 0.11.0 A client side hook is suppose to be invoked when a user submits the topology using TopologySubmitter. We already have nimbus side hook for all the topology actions however those are good if users don't want to actually inspect the topology being submitted or the classes that makes up the topology (spouts and bolts) as on nimbus side these classes are not available in class path. As a concrete example, in hortonworks we wanted to integrate storm with atlas to provide complete lineage of data even when it passes through a storm topology. Atlas needed to actually look inside the topology components (i.e. kafka spout to figure out what topic the data is being pulled from, or hbase bolt to figure out which cluster and what table data is being pushed into.) to give a meaningful lineage. We originally proposed that they use the server side hook but with that they had to download the user uploaded jar and add it to the class path dynamically or spin a new jvn whose output will then be read by the atlas integration hook. The client side hook is suppose to make it easy when the topology itself needs to be examined. We are using this in our internal repo for atlas integration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-1381:Adding client side submission hook ...
GitHub user Parth-Brahmbhatt opened a pull request: https://github.com/apache/storm/pull/935 STORM-1381:Adding client side submission hook along with LocalCluster⦠⦠changes to run tets. Conflicts: storm-core/src/jvm/backtype/storm/Config.java Topology hook should work in local mode too. LocalCluster changes to run nimbus thrift server. StormSubmitter hook changes along with LocalCluster changes to run tets. Conflicts: storm-core/src/clj/backtype/storm/daemon/nimbus.clj storm-core/src/jvm/backtype/storm/StormSubmitter.java storm-core/src/jvm/backtype/storm/utils/Utils.java You can merge this pull request into a Git repository by running: $ git pull https://github.com/Parth-Brahmbhatt/incubator-storm STORM-1381 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/935.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #935 commit c5561cfdf520126ad8e5dd8ae39c025eb6041518 Author: Parth Brahmbhatt <brahmbhatt.pa...@gmail.com> Date: 2015-11-12T20:08:58Z STORM-1381:Adding client side submission hook along with LocalCluster changes to run tets. Conflicts: storm-core/src/jvm/backtype/storm/Config.java Topology hook should work in local mode too. LocalCluster changes to run nimbus thrift server. StormSubmitter hook changes along with LocalCluster changes to run tets. Conflicts: storm-core/src/clj/backtype/storm/daemon/nimbus.clj storm-core/src/jvm/backtype/storm/StormSubmitter.java storm-core/src/jvm/backtype/storm/utils/Utils.java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1030. Hive Connector Fixes.
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/871#issuecomment-162580970 Still +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1040. SQL support for Storm
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/919#issuecomment-162082605 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1345: UpdateTopology API and implementat...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/922#issuecomment-162069596 @revans2 @unsleepy22 I did not know Jstorm has it and hasn't had a chance to look at the dist cache. My goal was to allow users to update topology or config or jar and support workers being restarted in a rolling fashion, When the topology it self is changed we can still do rolling restart and that is the part where I have a TODO that I need to finish. I thought all of those required workers to bounce, I am not sure which parts can work without bouncing the worker. The code I have commented is about making update a state just like rebalance/kill/activate and I am still not convinced that it needs to be a state of its own. I am fine with waiting till we are done with JStorm integration as long as JStorm achieves the same goals. If others think that this is an important enough feature that we should include in the release before we do a feature lockdown, I can upmerge with dist cache and use it as @revans2 suggested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1345: UpdateTopology API and implementat...
GitHub user Parth-Brahmbhatt opened a pull request: https://github.com/apache/storm/pull/922 STORM-1345: UpdateTopology API and implementation. No unit tests added. Update will allow to update config, jar (if some dependency is updated without the topology changing in any way), or the topology change it self. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Parth-Brahmbhatt/incubator-storm STORM-1345 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/922.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #922 commit 23e961a34ce500a2a101ef7241fa006b6d29 Author: Parth Brahmbhatt <brahmbhatt.pa...@gmail.com> Date: 2015-12-04T06:08:02Z STORM-1345: UpdateTopology API and implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (STORM-1346) upgrade topology CLI tool
[ https://issues.apache.org/jira/browse/STORM-1346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15040780#comment-15040780 ] Parth Brahmbhatt commented on STORM-1346: - Code available in https://github.com/Parth-Brahmbhatt/incubator-storm/tree/STORM-1346 but is blocked until https://github.com/apache/storm/pull/922 gets merged. > upgrade topology CLI tool > -- > > Key: STORM-1346 > URL: https://issues.apache.org/jira/browse/STORM-1346 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core > Reporter: Parth Brahmbhatt >Assignee: Parth Brahmbhatt > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (STORM-1346) upgrade topology CLI tool
Parth Brahmbhatt created STORM-1346: --- Summary: upgrade topology CLI tool Key: STORM-1346 URL: https://issues.apache.org/jira/browse/STORM-1346 Project: Apache Storm Issue Type: Sub-task Components: storm-core Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (STORM-1345) Thrift, nimbus ,zookeeper, supervisor and worker changes to support update topology.
Parth Brahmbhatt created STORM-1345: --- Summary: Thrift, nimbus ,zookeeper, supervisor and worker changes to support update topology. Key: STORM-1345 URL: https://issues.apache.org/jira/browse/STORM-1345 Project: Apache Storm Issue Type: Sub-task Components: storm-core Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (STORM-1347) ui changes to display the topology version.
Parth Brahmbhatt created STORM-1347: --- Summary: ui changes to display the topology version. Key: STORM-1347 URL: https://issues.apache.org/jira/browse/STORM-1347 Project: Apache Storm Issue Type: Sub-task Components: storm-core Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: [STORM-126] Add Lifecycle support API for work...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/884#issuecomment-158451109 I'm +1 too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1221. Create a common interface for all ...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/895#issuecomment-158230675 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-126] Add Lifecycle support API for work...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/884#issuecomment-157749683 Thanks for the contribution, any reason you decided to make this hook part of serialized topology. If you see some other examples, like the nimbus hook (though it is not really a great example as its on nimbus side and not worker but still close) we ask the user to just provide the Fully qualified classname as a config option, create an instance of it using reflection and invoke prepare (start in your case) or cleanup on that instance. The only advantage of putting this as part of topology is that users will be able to provide objects that are completely serialized so it can be initialized with constructor args or with any other way that relies on instance variable initialization but I don't see that as a huge upside. On the other hand a consistent way to implement all hooks will make code easy to read and reason about. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (STORM-1187) Support for late and out of order events in time based windows
[ https://issues.apache.org/jira/browse/STORM-1187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15011207#comment-15011207 ] Parth Brahmbhatt commented on STORM-1187: - [~arunmahadevan] Do you want to post your design doc for review? > Support for late and out of order events in time based windows > -- > > Key: STORM-1187 > URL: https://issues.apache.org/jira/browse/STORM-1187 > Project: Apache Storm > Issue Type: Sub-task >Reporter: Arun Mahadevan >Assignee: Arun Mahadevan > > Right now the time based windows uses the timestamp when the tuple is > received by the bolt. > However there are use cases where the tuples can be processed based on the > time when they are actually generated vs the time when they are received. So > we need to add support for processing events with a time lag and also have > some way to specify and read tuple timestamps. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (STORM-1098) Storm Nimbus Hook
[ https://issues.apache.org/jira/browse/STORM-1098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt resolved STORM-1098. - Resolution: Fixed Fix Version/s: 0.11.0 > Storm Nimbus Hook > - > > Key: STORM-1098 > URL: https://issues.apache.org/jira/browse/STORM-1098 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Parth Brahmbhatt > Fix For: 0.11.0 > > > Apache Atlas provides governance services and also lineage. It will be great > if we can capture the topology changes as part of Apache Atlas so that user > can see how the topology changed over the period of time. > Storm has ITaskHook but this is for topology components like spout & bolt. > Similar to ITaskHook we should provide a nimbus hook that will allow > pluggable implementations to run and can nimbus will execute this on any > topology operation like upload jar, download, activate, deactivate , kill > etc.. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-1098: Nimbus hook for topology actions.
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/862#issuecomment-156209629 Venketesh did give some comments. I am going to merge this in given the comments would not block them --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1030. Hive Connector Fixes.
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/871#issuecomment-155269316 overall I am +1, couple of clarifying questions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1030. Hive Connector Fixes.
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/871#discussion_r44365442 --- Diff: external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java --- @@ -87,7 +87,7 @@ public void prepare(Map conf, TopologyContext topologyContext, OutputCollector c } } this.collector = collector; -allWriters = new HashMap<HiveEndPoint,HiveWriter>(); +allWriters = new ConcurrentHashMap<HiveEndPoint,HiveWriter>(); --- End diff -- any reason to make it concurrent? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1098: Nimbus hook for topology actions.
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/862#discussion_r44304729 --- Diff: storm-core/test/jvm/backtype/storm/nimbus/InMemoryTopologyAcitonNotifier.java --- @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.nimbus; + +import java.util.Map; +import java.util.LinkedList; +import java.util.List; +import java.util.HashMap; + +public class InMemoryTopologyAcitonNotifier implements ITopologyActionNotifierPlugin { --- End diff -- I don't like this class either. I tried just stubbing out but given the config is just the string and the actual instance is created using reflection my only option were to either stub notify-topology-action-listener, which is the method I am trying to test. I also tried stubbing nimbus-data which holds the reference to the actual instance but for one reason or another nimbus just failed to start with that stubbing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-902] Simple Log Search
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/848#issuecomment-155133328 Hi, I am +1 on this patch. Sorry took me longer to review than I expected. Thank a lot for your contributions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1098: Nimbus hook for topology actions.
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/862#discussion_r44373750 --- Diff: storm-core/test/jvm/backtype/storm/nimbus/InMemoryTopologyAcitonNotifier.java --- @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.nimbus; + +import java.util.Map; +import java.util.LinkedList; +import java.util.List; +import java.util.HashMap; + +public class InMemoryTopologyAcitonNotifier implements ITopologyActionNotifierPlugin { + +//static to ensure eventhough the class is created using reflection we can still get +//the topology to actions +private static final Map<String, LinkedList> topologyToActions = new HashMap<>(); + + +@Override +public void prepare(Map StormConf) { +//no-op +} + +@Override +public synchronized void notify(String topologyName, String action) { --- End diff -- This is not being invoked in critical (tuple execution) path so I think we don't really care if this method is sync. Do you anticipate huge latencies here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1098: Nimbus hook for topology actions.
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/862#discussion_r44373716 --- Diff: storm-core/src/jvm/backtype/storm/nimbus/ITopologyActionNotifierPlugin.java --- @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.nimbus; + +import java.util.Map; + +/** + * A plugin interface that gets invoked any time there is an action for a topology. + */ +public interface ITopologyActionNotifierPlugin { +/** + * Called once during nimbus initialization. + * @param StormConf + */ +void prepare(Map StormConf); + +/** + * When a new actions is executed for a topology, this method will be called. + * @param topologyName + * @param action + */ +void notify(String topologyName, String action); --- End diff -- They are not,are you seeing TopologyId somewhere? I will double check if I incorrectly named some variable. Id is a unique identifier which can change when a topology is killed and re-submitted where as name can remain same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1098: Nimbus hook for topology actions.
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/862#discussion_r44373855 --- Diff: storm-core/src/jvm/backtype/storm/nimbus/ITopologyActionNotifierPlugin.java --- @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.nimbus; + +import java.util.Map; + +/** + * A plugin interface that gets invoked any time there is an action for a topology. + */ +public interface ITopologyActionNotifierPlugin { +/** + * Called once during nimbus initialization. + * @param StormConf + */ +void prepare(Map StormConf); + +/** + * When a new actions is executed for a topology, this method will be called. + * @param topologyName + * @param action + */ +void notify(String topologyName, String action); --- End diff -- I actually thought about making it an Enum but I think that will make it less flexible, no personal preference though so if you think Enum makes things easier for you to reason I can change that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1098: Nimbus hook for topology actions.
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/862#discussion_r44373870 --- Diff: storm-core/test/jvm/backtype/storm/nimbus/InMemoryTopologyAcitonNotifier.java --- @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.nimbus; + +import java.util.Map; +import java.util.LinkedList; +import java.util.List; +import java.util.HashMap; + +public class InMemoryTopologyAcitonNotifier implements ITopologyActionNotifierPlugin { --- End diff -- Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1185] replace nimbus.host with nimbus.s...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/865#discussion_r44204822 --- Diff: conf/storm.yaml.example --- @@ -19,7 +19,7 @@ # - "server1" # - "server2" # -# nimbus.host: "nimbus" +# nimbus.seeds: ["localhost"] --- End diff -- Can you instead use values that will be valid in a single node setup? With current values anyone running storm locally will be forced to update this value. nimbus.seeds:["localhost","127.0.0.1"] --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1098: Nimbus hook for topology actions.
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/862#issuecomment-154580884 Upmerged. I will merge the patch in once someone from Atlas team reviews and confirms this is sufficient for them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-902] Simple Log Search
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/848#issuecomment-154464749 @zhuoliu Sorry I did partial review and then got distracted, give me today's day and I will add more comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1098: Nimbus hook for topology actions.
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/862#discussion_r44189170 --- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj --- @@ -124,6 +124,8 @@ :id->sched-status (atom {}) :cred-renewers (AuthUtils/GetCredentialRenewers conf) :nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf) + :nimbus-topology-action-notifier (when-not (clojure.string/blank? (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN)) +(new-instance (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN))) --- End diff -- fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1098: Nimbus hook for topology actions.
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/862#discussion_r44189187 --- Diff: storm-core/src/clj/backtype/storm/daemon/nimbus.clj --- @@ -814,6 +816,11 @@ (.assignSlots inimbus topologies))) (log-message "not a leader, skipping assignments"))) +(defn notify-topology-action-listener [nimbus storm-id action] + (let [topology-action-notifier (:nimbus-topology-action-notifier nimbus)] +(when (not-nil? topology-action-notifier) + (.notify topology-action-notifier storm-id action --- End diff -- fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-902] Simple Log Search
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/848#discussion_r44042484 --- Diff: docs/documentation/ui-rest-api.md --- @@ -172,6 +172,27 @@ Sample response: } ``` +### /api/v1/history/summary (GET) + +Returns a list of topology ID submitted by the current user. + +Response fields: + +|Field |Value | Description| +|--- |---|--- +|topo-history| List| List of Topologies' IDs| + +Sample response: + +```json +{ +"topo-history":[ +{"host":"wc6-1-1446571009"}, --- End diff -- I am confused if this is suppose to return a list of TopologyIDs why does the resposse have a list of map with host name as key? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: [Discusson] Storm System Tests
+1 for different targets. Thanks Parth On 11/5/15, 11:40 AM, "Parth Brahmbhatt" <pbrahmbh...@hortonworks.com> wrote: >Our internal tests do things like that but as Harsha mentioned It is kind >of tightly couple with our infra. > >I think if we can agree on using DuckTape (or some other framework) we >can move to writing some simple test cases. > >Thanks >Parth > >From: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>> >Reply-To: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>> >Date: Thursday, November 5, 2015 at 12:34 PM >To: Parth Brahmbhatt ><pbrahmbh...@hortonworks.com<mailto:pbrahmbh...@hortonworks.com>>, >"dev@storm.apache.org<mailto:dev@storm.apache.org>" ><dev@storm.apache.org<mailto:dev@storm.apache.org>> >Subject: Re: [Discusson] Storm System Tests > >If you have a patch that just brings up a very small cluster with 2 >nimbus instances and shoots things, that would be really great. > >- Bobby > > > >On Thursday, November 5, 2015 1:29 PM, Parth Brahmbhatt ><pbrahmbh...@hortonworks.com<mailto:pbrahmbh...@hortonworks.com>> wrote: > > >It will be good to add a basic test suite before 0.11 release at least to >test out things like HA where we should really have some chaos monkey >testing. From all the options I have seen/used duck tape seems to be the >best option as of now. > >Thanks >Parth > >On 11/5/15, 11:23 AM, "Bobby Evans" ><ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>> wrote: > >>These would be run frequently but not necessarily a part of CI initially. >> > >
Re: [Discusson] Storm System Tests
Our internal tests do things like that but as Harsha mentioned It is kind of tightly couple with our infra. I think if we can agree on using DuckTape (or some other framework) we can move to writing some simple test cases. Thanks Parth From: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>> Reply-To: Bobby Evans <ev...@yahoo-inc.com<mailto:ev...@yahoo-inc.com>> Date: Thursday, November 5, 2015 at 12:34 PM To: Parth Brahmbhatt <pbrahmbh...@hortonworks.com<mailto:pbrahmbh...@hortonworks.com>>, "dev@storm.apache.org<mailto:dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org>> Subject: Re: [Discusson] Storm System Tests If you have a patch that just brings up a very small cluster with 2 nimbus instances and shoots things, that would be really great. - Bobby On Thursday, November 5, 2015 1:29 PM, Parth Brahmbhatt <pbrahmbh...@hortonworks.com<mailto:pbrahmbh...@hortonworks.com>> wrote: It will be good to add a basic test suite before 0.11 release at least to test out things like HA where we should really have some chaos monkey testing. From all the options I have seen/used duck tape seems to be the best option as of now. Thanks Parth On 11/5/15, 11:23 AM, "Bobby Evans" <ev...@yahoo-inc.com.INVALID<mailto:ev...@yahoo-inc.com.INVALID>> wrote: >These would be run frequently but not necessarily a part of CI initially. >
Re: [Discusson] Storm System Tests
It will be good to add a basic test suite before 0.11 release at least to test out things like HA where we should really have some chaos monkey testing. From all the options I have seen/used duck tape seems to be the best option as of now. Thanks Parth On 11/5/15, 11:23 AM, "Bobby Evans"wrote: >These would be run frequently but not necessarily a part of CI initially. >
[jira] [Commented] (STORM-1098) Storm Nimbus Hook
[ https://issues.apache.org/jira/browse/STORM-1098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14992692#comment-14992692 ] Parth Brahmbhatt commented on STORM-1098: - [~svenkat] Give you are the reporter for ATLAS-183 and ATLAS-181, Please review the interface and let me know if this will suffice for Atlas integration. > Storm Nimbus Hook > - > > Key: STORM-1098 > URL: https://issues.apache.org/jira/browse/STORM-1098 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Parth Brahmbhatt > > Apache Atlas provides governance services and also lineage. It will be great > if we can capture the topology changes as part of Apache Atlas so that user > can see how the topology changed over the period of time. > Storm has ITaskHook but this is for topology components like spout & bolt. > Similar to ITaskHook we should provide a nimbus hook that will allow > pluggable implementations to run and can nimbus will execute this on any > topology operation like upload jar, download, activate, deactivate , kill > etc.. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-1098: Nimbus hook for topology actions.
GitHub user Parth-Brahmbhatt opened a pull request: https://github.com/apache/storm/pull/862 STORM-1098: Nimbus hook for topology actions. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Parth-Brahmbhatt/incubator-storm STORM-1098 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/862.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #862 commit e49e8c52e5f8814e8ea89c6c58347913ed313fde Author: Parth Brahmbhatt <brahmbhatt.pa...@gmail.com> Date: 2015-11-05T20:55:13Z STORM-1098: Nimbus hook for topology actions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1129: Use topology name instead of id in...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/854#issuecomment-153864538 I agree with @revans2 , lets maintain backward compatibility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/855#discussion_r43940643 --- Diff: storm-core/src/jvm/backtype/storm/windowing/WindowManager.java --- @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.windowing; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +import static backtype.storm.topology.base.BaseWindowedBolt.Count; +import static backtype.storm.topology.base.BaseWindowedBolt.Duration; + +/** + * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks + * on expiry of events or activation of the window due to {@link TriggerPolicy}. + * + * @param the type of event in the window. + */ +public class WindowManager implements TriggerHandler { +private static final Logger LOG = LoggerFactory.getLogger(WindowManager.class); + +/** + * Expire old events every EXPIRE_EVENTS_THRESHOLD to + * keep the window size in check. + */ +public static final int EXPIRE_EVENTS_THRESHOLD = 100; + +private WindowLifecycleListener windowLifecycleListener; +private ConcurrentLinkedQueue<Event> window; +private EvictionPolicy evictionPolicy; +private TriggerPolicy triggerPolicy; +private List expiredEvents; +private Set<Event> prevWindowEvents; +private AtomicInteger eventsSinceLastExpiry; +private ReentrantLock lock; + +public WindowManager(WindowLifecycleListener lifecycleListener) { +windowLifecycleListener = lifecycleListener; +window = new ConcurrentLinkedQueue<>(); +expiredEvents = new ArrayList<>(); +prevWindowEvents = new HashSet<>(); +eventsSinceLastExpiry = new AtomicInteger(); +lock = new ReentrantLock(true); +} + +public void setWindowLength(Count count) { +this.evictionPolicy = new CountEvictionPolicy<>(count.value); +} + +public void setWindowLength(Duration duration) { +this.evictionPolicy = new TimeEvictionPolicy<>(duration.value); +} + +public void setSlidingInterval(Count count) { +this.triggerPolicy = new CountTriggerPolicy<>(count.value, this); +} + +public void setSlidingInterval(Duration duration) { +this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this); +} + +/** + * Add an event into the window, with {@link System#currentTimeMillis()} as + * the tracking ts. + * + * @param event the event to add + */ +public void add(T event) { +add(event, System.currentTimeMillis()); +} + +/** + * Add an event into the window, with the given ts as the tracking ts. + * + * @param event the event to track + * @param tsthe timestamp + */ +public void add(T event, long ts) { +Event windowEvent = new EventImpl(event, ts); +window.add(windowEvent); +track(windowEvent); +compactWindow(); --- End diff -- not sure why we need this? Why not just rely on onTrigger to expire events? Also don't we need the locking here as well to ensure that when onTrigger is fired no new events can be added given that will change the events in window? plus i think it will give us undefined behavior as in line 187 we are using an iterator to remove an element from window list while this add method can add an element to it. --- If your projec
[jira] [Assigned] (STORM-1098) Storm Nimbus Hook
[ https://issues.apache.org/jira/browse/STORM-1098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt reassigned STORM-1098: --- Assignee: Parth Brahmbhatt (was: Sriharsha Chintalapani) > Storm Nimbus Hook > - > > Key: STORM-1098 > URL: https://issues.apache.org/jira/browse/STORM-1098 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core >Reporter: Sriharsha Chintalapani >Assignee: Parth Brahmbhatt > > Apache Atlas provides governance services and also lineage. It will be great > if we can capture the topology changes as part of Apache Atlas so that user > can see how the topology changed over the period of time. > Storm has ITaskHook but this is for topology components like spout & bolt. > Similar to ITaskHook we should provide a nimbus hook that will allow > pluggable implementations to run and can nimbus will execute this on any > topology operation like upload jar, download, activate, deactivate , kill > etc.. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/855#discussion_r43932253 --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java --- @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.topology.base; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IWindowedBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.windowing.TupleWindow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class BaseWindowedBolt implements IWindowedBolt { +private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class); + +private transient Map<String, Object> windowConfiguration; --- End diff -- I am not sure why we can't just have instance variable for all the configs? Why does it need to go to a transient map? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/855#discussion_r43933512 --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java --- @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.topology; + +import backtype.storm.Config; +import backtype.storm.task.IOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import backtype.storm.windowing.TupleWindowImpl; +import backtype.storm.windowing.WindowLifecycleListener; +import backtype.storm.windowing.WindowManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static backtype.storm.topology.base.BaseWindowedBolt.Count; +import static backtype.storm.topology.base.BaseWindowedBolt.Duration; + +/** + * An {@link IWindowedBolt} wrapper that does the windowing of tuples. + */ +public class WindowedBoltExecutor implements IRichBolt { +private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class); + +private IWindowedBolt bolt; +private transient WindowedOutputCollector windowedOutputCollector; +private transient WindowLifecycleListener listener; +private transient WindowManager windowManager; + +public WindowedBoltExecutor(IWindowedBolt bolt) { +this.bolt = bolt; +} + +private int getTopologyTimeoutMillis(Map stormConf) { +if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) { +boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS); +if (!timeOutsEnabled) { +return Integer.MAX_VALUE; +} +} +int timeout = 0; +if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) { +timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); +} +return timeout * 1000; +} + +private void ensureDurationLessThanTimeout(int duration, int timeout) { +if (duration > timeout) { +throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration + + " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS + + " value " + timeout); +} +} + +// TODO: add more validation +private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration, + Count slidingIntervalCount, Duration slidingIntervalDuration) { + +int topologyTimeout = getTopologyTimeoutMillis(stormConf); +if (windowLengthDuration != null && slidingIntervalDuration != null) { +ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout); +} else if (windowLengthDuration != null) { +ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout); +} +} + +private WindowManager initWindowManager(WindowLifecycleListener lifecycleListener, Map stormConf) { +WindowManager manager = new WindowManager<>(lifecycleListener); +Duration windowLengthDuration = null; +Count windowLengthCount = null; +Duration slidingIntervalDuration = null; +Count slidingIntervalCount = null; +if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) { +windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_
[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/855#discussion_r43922969 --- Diff: storm-core/src/jvm/backtype/storm/windowing/WindowManager.java --- @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.windowing; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +import static backtype.storm.topology.base.BaseWindowedBolt.Count; +import static backtype.storm.topology.base.BaseWindowedBolt.Duration; + +/** + * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks + * on expiry of events or activation of the window due to {@link TriggerPolicy}. + * + * @param the type of event in the window. + */ +public class WindowManager implements TriggerHandler { +private static final Logger LOG = LoggerFactory.getLogger(WindowManager.class); + +/** + * Expire old events every EXPIRE_EVENTS_THRESHOLD to + * keep the window size in check. + */ +public static final int EXPIRE_EVENTS_THRESHOLD = 100; + +private WindowLifecycleListener windowLifecycleListener; +private ConcurrentLinkedQueue<Event> window; +private EvictionPolicy evictionPolicy; +private TriggerPolicy triggerPolicy; +private List expiredEvents; +private Set<Event> prevWindowEvents; +private AtomicInteger eventsSinceLastExpiry; +private ReentrantLock lock; + +public WindowManager(WindowLifecycleListener lifecycleListener) { +windowLifecycleListener = lifecycleListener; +window = new ConcurrentLinkedQueue<>(); +expiredEvents = new ArrayList<>(); +prevWindowEvents = new HashSet<>(); +eventsSinceLastExpiry = new AtomicInteger(); +lock = new ReentrantLock(true); +} + +public void setWindowLength(Count count) { +this.evictionPolicy = new CountEvictionPolicy<>(count.value); +} + +public void setWindowLength(Duration duration) { +this.evictionPolicy = new TimeEvictionPolicy<>(duration.value); +} + +public void setSlidingInterval(Count count) { +this.triggerPolicy = new CountTriggerPolicy<>(count.value, this); +} + +public void setSlidingInterval(Duration duration) { +this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this); +} + +/** + * Add an event into the window, with {@link System#currentTimeMillis()} as + * the tracking ts. + * + * @param event the event to add + */ +public void add(T event) { +add(event, System.currentTimeMillis()); +} + +/** + * Add an event into the window, with the given ts as the tracking ts. + * + * @param event the event to track + * @param tsthe timestamp + */ +public void add(T event, long ts) { +Event windowEvent = new EventImpl(event, ts); +window.add(windowEvent); +track(windowEvent); +compactWindow(); +} + +/** + * The callback invoked by the trigger policy. + */ +@Override +public void onTrigger() { +List<Event> windowEvents = new ArrayList<>(); +List expired = null; +try { +lock.lock(); +/* + * scan the entire window to handle out of order events in + * the case of time based windows. +
[GitHub] storm pull request: STORM-1129: Use topology name instead of id in...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/854#issuecomment-153836316 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/855#discussion_r43894593 --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java --- @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.topology.base.BaseWindowedBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; +import backtype.storm.windowing.TupleWindow; +import storm.starter.bolt.PrinterBolt; + +import java.util.Map; +import java.util.Random; + +import static backtype.storm.topology.base.BaseWindowedBolt.Count; +import static backtype.storm.topology.base.BaseWindowedBolt.Duration; + +/** + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt} + * to calculate sliding window sum. + */ +public class SlidingWindowTopology { + +/* + * emits random integers every 100 ms + */ +private static class RandomIntegerSpout extends BaseRichSpout { +SpoutOutputCollector collector; + +@Override +public void declareOutputFields(OutputFieldsDeclarer declarer) { +declarer.declare(new Fields("value")); +} + +@Override +public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { +this.collector = collector; +} + +@Override +public void nextTuple() { +Utils.sleep(100); +Random rand = new Random(); +Integer value = rand.nextInt(1000); +collector.emit(new Values(value)); +} +} + +/* + * Computes sliding window sum + */ +private static class SlidingWindowSumBolt extends BaseWindowedBolt { +private int sum = 0; +private OutputCollector collector; + +@Override +public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { +this.collector = collector; +} + +@Override +public void execute(TupleWindow inputWindow) { +System.out.println("Events in current window: " + inputWindow.get().size()); --- End diff -- remove sysout --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/855#discussion_r43894940 --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java --- @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.topology.base.BaseWindowedBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; +import backtype.storm.windowing.TupleWindow; +import storm.starter.bolt.PrinterBolt; + +import java.util.Map; +import java.util.Random; + +import static backtype.storm.topology.base.BaseWindowedBolt.Count; +import static backtype.storm.topology.base.BaseWindowedBolt.Duration; + +/** + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt} + * to calculate sliding window sum. + */ +public class SlidingWindowTopology { + +/* + * emits random integers every 100 ms + */ +private static class RandomIntegerSpout extends BaseRichSpout { +SpoutOutputCollector collector; + +@Override +public void declareOutputFields(OutputFieldsDeclarer declarer) { +declarer.declare(new Fields("value")); +} + +@Override +public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { +this.collector = collector; +} + +@Override +public void nextTuple() { +Utils.sleep(100); +Random rand = new Random(); +Integer value = rand.nextInt(1000); +collector.emit(new Values(value)); +} +} + +/* + * Computes sliding window sum + */ +private static class SlidingWindowSumBolt extends BaseWindowedBolt { +private int sum = 0; +private OutputCollector collector; + +@Override +public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { +this.collector = collector; +} + +@Override +public void execute(TupleWindow inputWindow) { +System.out.println("Events in current window: " + inputWindow.get().size()); +for (Tuple tuple : inputWindow.getNew()) { +sum += (int) tuple.getValue(0); +} +for (Tuple tuple : inputWindow.getExpired()) { +sum -= (int) tuple.getValue(0); +} +collector.emit(new Values(sum)); --- End diff -- I think this is trying to show an optimization where we dont calculate the entire sum each time, instead we just add the new values and subtract the expired one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/855#discussion_r43918453 --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java --- @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.topology; + +import backtype.storm.Config; +import backtype.storm.task.IOutputCollector; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import backtype.storm.windowing.TupleWindowImpl; +import backtype.storm.windowing.WindowLifecycleListener; +import backtype.storm.windowing.WindowManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static backtype.storm.topology.base.BaseWindowedBolt.Count; +import static backtype.storm.topology.base.BaseWindowedBolt.Duration; + +/** + * An {@link IWindowedBolt} wrapper that does the windowing of tuples. + */ +public class WindowedBoltExecutor implements IRichBolt { +private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class); + +private IWindowedBolt bolt; +private transient WindowedOutputCollector windowedOutputCollector; +private transient WindowLifecycleListener listener; +private transient WindowManager windowManager; + +public WindowedBoltExecutor(IWindowedBolt bolt) { +this.bolt = bolt; +} + +private int getTopologyTimeoutMillis(Map stormConf) { +if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) { +boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS); +if (!timeOutsEnabled) { +return Integer.MAX_VALUE; +} +} +int timeout = 0; +if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) { +timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); +} +return timeout * 1000; +} + +private void ensureDurationLessThanTimeout(int duration, int timeout) { +if (duration > timeout) { +throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration + + " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS + + " value " + timeout); +} +} + +// TODO: add more validation +private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration, + Count slidingIntervalCount, Duration slidingIntervalDuration) { + +int topologyTimeout = getTopologyTimeoutMillis(stormConf); +if (windowLengthDuration != null && slidingIntervalDuration != null) { +ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout); +} else if (windowLengthDuration != null) { +ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout); +} +} + +private WindowManager initWindowManager(WindowLifecycleListener lifecycleListener, Map stormConf) { +WindowManager manager = new WindowManager<>(lifecycleListener); +Duration windowLengthDuration = null; +Count windowLengthCount = null; +Duration slidingIntervalDuration = null; +Count slidingIntervalCount = null; +if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) { --- End diff -- Given we are getting these from storm config, does this mean a user can not hav
[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/855#discussion_r43918998 --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java --- @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.topology.base; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IWindowedBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.windowing.TupleWindow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class BaseWindowedBolt implements IWindowedBolt { +private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class); + +private transient Map<String, Object> windowConfiguration; + +/** + * Holds a count value for count based windows and sliding intervals. + */ +public static class Count { +public final int value; + +public Count(int value) { +this.value = value; +} +} + +/** + * Holds a Time duration for time based windows and sliding intervals. + */ +public static class Duration { +public final int value; + +public Duration(int value, TimeUnit timeUnit) { +this.value = (int) timeUnit.toMillis(value); +} +} + +protected BaseWindowedBolt() { +windowConfiguration = new HashMap<>(); +} + +private BaseWindowedBolt withWindowLength(Count count) { +windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, count.value); +return this; +} + +private BaseWindowedBolt withWindowLength(Duration duration) { + windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, duration.value); +return this; +} + +private BaseWindowedBolt withSlidingInterval(Count count) { + windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, count.value); +return this; +} + +private BaseWindowedBolt withSlidingInterval(Duration duration) { + windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, duration.value); +return this; +} + +/** + * Tuple count based sliding window configuration. + * + * @param windowLengththe number of tuples in the window + * @param slidingInterval the number of tuples after which the window slides + */ +public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) { +return withWindowLength(windowLength).withSlidingInterval(slidingInterval); +} + +/** + * Tuple count and time duration based sliding window configuration. + * + * @param windowLengththe number of tuples in the window + * @param slidingInterval the time duration after which the window slides + */ +public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) { +return withWindowLength(windowLength).withSlidingInterval(slidingInterval); +} + +/** + * Time duration and count based sliding window configuration. + * + * @param windowLengththe time duration of the window + * @param slidingInterval the number of tuples after which the window slides + */ +public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) { +return withWindowLength(windowLength).withSlidingInterval(slidingInterval); +} + +/** + * Time duration based sliding window configuration
Re: [VOTE] Release Apache Storm 0.10.0 (rc1)
+1 Tested locally. On 10/29/15, 12:46 PM, "P. Taylor Goetz"wrote: >+1 (binding) > >Ran a suite of fault tolerance tests. > >-Taylor > >> On Oct 23, 2015, at 4:26 PM, P. Taylor Goetz wrote: >> >> This is a call to vote on releasing Apache Storm 0.10.0 (rc1) >> >> Full list of changes in this release: >> >> >>https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGE >>LOG.md;hb=d02f94268dec229d1125a24fdf53fa303cbc2b29 >> >> The tag/commit to be voted upon is v0.10.0: >> >> >>https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=45b1b148401f >>d05f0f79cc7abdf6b5c7fc43df20;hb=d02f94268dec229d1125a24fdf53fa303cbc2b29 >> >> The source archive being voted upon can be found here: >> >> >>https://dist.apache.org/repos/dist/dev/storm/apache-storm-0.10.0/apache-s >>torm-0.10.0-src.tar.gz >> >> Other release files, signatures and digests can be found here: >> >> https://dist.apache.org/repos/dist/dev/storm/apache-storm-0.10.0/ >> >> The release artifacts are signed with the following key: >> >> >>https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=KEYS;h >>b=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd >> >> The Nexus staging repository for this release is: >> >> https://repository.apache.org/content/repositories/orgapachestorm-1025 >> >> Please vote on releasing this package as Apache Storm 0.10.0. >> >> When voting, please list the actions taken to verify the release. >> >> This vote will be open for at least 72 hours. >> >> [ ] +1 Release this package as Apache Storm 0.10.0 >> [ ] 0 No opinion >> [ ] -1 Do not release this package because... >> >> Thanks to everyone who contributed to this release. >> >> -Taylor >
[jira] [Created] (STORM-1147) Storm JDBCBolt should add validation to ensure either insertQuery or table name is specified and not both.
Parth Brahmbhatt created STORM-1147: --- Summary: Storm JDBCBolt should add validation to ensure either insertQuery or table name is specified and not both. Key: STORM-1147 URL: https://issues.apache.org/jira/browse/STORM-1147 Project: Apache Storm Issue Type: Bug Components: storm-jdbc Affects Versions: 0.10.0 Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Priority: Trivial Fix For: 0.11.0 The JDBCBolt takes either an insert query or table name but does not do any validation check to ensure only one of the two option is provided. We should add a validation check and throw an exception with proper messaging to avoid confusion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-1147: Added validation checks and tests.
GitHub user Parth-Brahmbhatt opened a pull request: https://github.com/apache/storm/pull/831 STORM-1147: Added validation checks and tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Parth-Brahmbhatt/incubator-storm STORM-1147 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/831.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #831 commit c367fdbd88bf7c8a5b20dc37299bc357b33fd437 Author: Parth Brahmbhatt <brahmbhatt.pa...@gmail.com> Date: 2015-10-29T21:29:17Z STORM-1147: Added validation checks and tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (STORM-1139) Issues regarding storm-postgresql interface
[ https://issues.apache.org/jira/browse/STORM-1139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14981895#comment-14981895 ] Parth Brahmbhatt commented on STORM-1139: - You can post your question to u...@storm.apache.org see this for more details on how to subscribe https://storm.apache.org/community.html. If I understand correctly you want to write a storm topology where one of the components write to PostegresDB. WE have a jdbc connector that you can try out https://github.com/apache/storm/tree/master/external/storm-jdbc. For an example topology see https://github.com/apache/storm/blob/master/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java > Issues regarding storm-postgresql interface > --- > > Key: STORM-1139 > URL: https://issues.apache.org/jira/browse/STORM-1139 > Project: Apache Storm > Issue Type: Bug >Reporter: hima >Assignee: hima > > hai > I am trying to write storm bolt to insert data in postgesql DB.But i am > facing issues like > java.io.NotSerializableException:org.postgresql.jdbc4.Jdbc4Connection. > Can anyone provide me full code for storm bolt that can insert data into > postgres database. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] Accept Alibaba JStorm Code Donation
+1. On 10/27/15, 10:56 AM, "Bobby Evans"wrote: >+1 >Apache is about community over code, and this should help to merge the >two communities, at the cost of working through merging the two code >bases. > - Bobby > > > On Tuesday, October 27, 2015 12:49 PM, P. Taylor Goetz > wrote: > > > All, >The IP Clearance process for the Alibaba JStorm code donation has >completed. >The IP Clearance Status document can be found here: >http://incubator.apache.org/ip-clearance/storm-jstorm.html >The source code can be found at https://github.com/alibaba/jstorm with >the following git commit SHA: e935da91a897797dad56e24c4ffa57860ac91878 >This is a VOTE to accept the code donation, and import the donated code >into the Apache Storm git repository. Discussion regarding how to proceed >with merging the codebases can take place in separate thread. >[ ] +1 Accept the Alibaba JStorm code donation.[ ] +0 Indifferent[ ] -1 >Do not accept the code donation becauseŠ >This VOTE will be open for at least 72 hours. >-Taylor > > > >
[GitHub] storm pull request: [STORM-1115] Stale leader-lock key effectively...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/802#issuecomment-148763402 @revans2 The log concerns were from the origin PR that @danielschonfeld which he has fixed but I guess he force pushed the branch. I am +1 on this change too. @danielschonfeld On a side note, can you provide any steps to reproduce this locally? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1115] Stale leader-lock key effectively...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/802#issuecomment-148524703 lot of unnecessary log statements, can you remove them? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1104] Nimbus HA fails to find newly dow...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/794#issuecomment-147019720 +1, good catch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (STORM-726) Adding nimbus.host config for backward compatibility of client config
[ https://issues.apache.org/jira/browse/STORM-726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt resolved STORM-726. Resolution: Fixed > Adding nimbus.host config for backward compatibility of client config > - > > Key: STORM-726 > URL: https://issues.apache.org/jira/browse/STORM-726 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core > Reporter: Parth Brahmbhatt >Assignee: Parth Brahmbhatt > > As part of Nimbus HA initiative we added nimbus discovery for client based on > a new config called nimbus.seeds where users can specify a list of nimbus > hosts that the clients can contact to figure out leader nimbus address. We > deleted the nimbus.host config which is one value that all users modify in > their cluster setup. Deleting this config is a backward incompatible change > and will pretty much force everyone to update their client config even if > they don't want nimbus HA. For backward compatibilty it is better to fail ver > to nimbus.host when nimbus.seeds config has no value. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (STORM-654) Create a thrift API to discover nimbus so all the clients are not forced to contact zookeeper.
[ https://issues.apache.org/jira/browse/STORM-654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt resolved STORM-654. Resolution: Fixed > Create a thrift API to discover nimbus so all the clients are not forced to > contact zookeeper. > -- > > Key: STORM-654 > URL: https://issues.apache.org/jira/browse/STORM-654 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core >Reporter: Parth Brahmbhatt > Assignee: Parth Brahmbhatt > > Current implementation of Nimbus-HA requires each nimbus client to discover > nimbus hosts by contacting zookeeper. In order to reduce the load on > zookeeper we could expose a thrift API as described in the future improvement > section of the Nimbus HA design doc. > We will add an extra field in ClusterSummary structure called nimbuses. > struct ClusterSummary { > 1: required list supervisors; > 2: required i32 nimbus_uptime_secs; > 3: required list topologies; > 4: required list nimbuses; > } > struct NimbusSummary { > 1: require string host; > 2: require int port; > 3: require int uptimeSecs; > 4: require boolean isLeader; > 5: require string version; > 6: optional list local_storm_ids; //need a better name but these > are list of storm-ids for which this nimbus host has the code available > locally. > } > We will create a nimbus.hosts configuration which will serve as the seed list > of nimbus hosts. Any nimbus host can serve the read requests so any client > can issue getClusterSummary call and they can extract the leader nimbus > summary from the list of nimbuses. All nimbus hosts will cache this > information to reduce the load on zookeeper. > In addition we can add a RedirectException. When a request that can only be > served by leader nimbus (i.e. submit, kill, rebalance, deactivate, activate) > is issued against a non leader nimbus, the non leader nimbus will throw a > RedirectException and the client will handle the exception by refreshing > their leader nimbus host and contacting that host as part of retry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (STORM-655) Ad replication count as part of topology summary.
[ https://issues.apache.org/jira/browse/STORM-655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt resolved STORM-655. Resolution: Fixed > Ad replication count as part of topology summary. > - > > Key: STORM-655 > URL: https://issues.apache.org/jira/browse/STORM-655 > Project: Apache Storm > Issue Type: Sub-task > Components: storm-core > Reporter: Parth Brahmbhatt >Assignee: Parth Brahmbhatt > > With Nimbus HA each topology is replicated across multiple nimbus hosts. We > want to modify the UI/REST/Thrift APIs so we can expose the replication count > of a topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: [STORM-412] Allow users to modify logging leve...
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/766#issuecomment-145263234 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Restart Nimbus After Submitting a Topology?
Can you elaborate/send logs to explain what you mean by ³bad state². Restarting nimbus should not do any harm but should not be needed just because of a new topology submission. Thanks Parth On 10/2/15, 10:29 AM, "abe oppenheim"wrote: >restart the Nimbus after submitting a new topology? >
[GitHub] storm pull request: STORM-1079. Batch Puts to HBase.
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/772#issuecomment-145159356 why would tuples start to timeout given before the tuple timeout occurs we have a tick tuple that fires and tries to flush the batch even if it has not reached the desired size? I thought the only difference the users would see now is that their latency may go up in a low throughput system unless they explicitly set the batch size = 1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1079. Batch Puts to HBase.
Github user Parth-Brahmbhatt commented on a diff in the pull request: https://github.com/apache/storm/pull/772#discussion_r41083829 --- Diff: external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java --- @@ -53,21 +61,62 @@ public HBaseBolt withConfigKey(String configKey) { return this; } +public HBaseBolt withBatchSize(int batchSize) { +this.batchSize = batchSize; +return this; +} + +public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) { +this.flushIntervalSecs = flushIntervalSecs; +return this; +} + +@Override +public Map<String, Object> getComponentConfiguration() { +Map<String, Object> conf = super.getComponentConfiguration(); +if (conf == null) +conf = new Config(); + +if (flushIntervalSecs > 0) { +LOG.info("Enabling tick tuple with interval [" + flushIntervalSecs + "]"); +conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs); +} + +return conf; +} + + @Override public void execute(Tuple tuple) { -byte[] rowKey = this.mapper.rowKey(tuple); -ColumnList cols = this.mapper.columns(tuple); -List mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL); +boolean flush = false; +if (TupleUtils.isTick(tuple)) { +LOG.debug("TICK received! current batch status [" + tupleBatch.size() + "/" + batchSize + "]"); +flush = true; +} else { +byte[] rowKey = this.mapper.rowKey(tuple); +ColumnList cols = this.mapper.columns(tuple); +List mutations = hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL : Durability.SKIP_WAL); +batchMutations.addAll(mutations); +tupleBatch.add(tuple); +if (tupleBatch.size() >= batchSize) --- End diff -- I can vouch for existence of such a community :-). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1079. Batch Puts to HBase.
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/772#issuecomment-144565292 +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-817: Support for Kafka Wildcard topics
Github user Parth-Brahmbhatt commented on the pull request: https://github.com/apache/storm/pull/561#issuecomment-143808292 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---