[GitHub] storm issue #2760: STORM-3123: Storm Kafka Monitor cannot connect to Kafka o...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/2760 @VipinRathor You should also update https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java and https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java That will make sure that lags will show up in storm ui as well where spout is talking to kafka on SSL listener. Can you also test it? ---
[GitHub] storm pull request #2760: STORM-3123: Storm Kafka Monitor cannot connect to ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/2760#discussion_r202213538 --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java --- @@ -145,7 +148,8 @@ public static void main(String args[]) { NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), - commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol); + commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol, --- End diff -- I suggest adding a check in the old spout if block and throw an error if this option provided with old spout. ---
[GitHub] storm issue #2670: STORM-3068: STORM_JAR_JVM_OPTS are not passed to storm-ka...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/2670 +1(NB) ---
[GitHub] storm issue #2670: STORM-3068: STORM_JAR_JVM_OPTS are not passed to storm-ka...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/2670 @rajkrrsingh When you say STORM_JAR_JVM_OPTS is not propagating to the java args, where is it not propagating from? I don't see it being set anywhere in this script. And i see that its sourcing the storm-env.sh script. So i was just suggesting just adding a check like we have for JAVA_HOME to handle case where its not defined. ---
[GitHub] storm issue #2670: STORM-3068: STORM_JAR_JVM_OPTS are not passed to storm-ka...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/2670 @rajkrrsingh Is the presumption that STORM_JAR_JVM_OPTS will be set? Have you checked if its okay when the variable is not set? Can you add it to README as well regarding this change? ---
[GitHub] storm pull request #2665: STORM-2988 Error on initialization of server mk-wo...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/2665#discussion_r186564359 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java --- @@ -67,7 +68,7 @@ public void prepare(MetricRegistry metricsRegistry, Map<String, Object> stormCon } public static String getMetricsJMXDomain(Map reporterConf) { -return Utils.getString(reporterConf, JMX_DOMAIN); +return Utils.getString(reporterConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN), null); --- End diff -- @arunmahadevan I am not saying we should return that as default value. I am suggesting to use that as the key to look up the conf map. ---
[GitHub] storm pull request #2665: STORM-2988 Error on initialization of server mk-wo...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/2665#discussion_r186560080 --- Diff: storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java --- @@ -67,7 +68,7 @@ public void prepare(MetricRegistry metricsRegistry, Map<String, Object> stormCon } public static String getMetricsJMXDomain(Map reporterConf) { -return Utils.getString(reporterConf, JMX_DOMAIN); +return Utils.getString(reporterConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN), null); --- End diff -- JMX_DOMAIN used initially is different than Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN I think we should stick to JMX_DOMAIN config key unless @ptgoetz thinks this is good. ---
[GitHub] storm issue #1505: STORM-1136: Command line module to return kafka spout off...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1505 @lichenglongqi You should not have to do anything. The lag should automatically show up in storm ui on the topology page after you submit a topology that has kafka spout in it. 1.0.4 does not have that feature. Can you try 1.1.1 ? ---
[GitHub] storm issue #2150: STORM-2541: Fix storm-kafka-client manual subscription no...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/2150 @srdo @harshach Some comments related to storm-kafka-monitor [~Srdo] Regarding first point, because the lag request is a http pull request from UI, as long as kafkaSpoutConfig.getSubscription().getTopicsString(); returns the correct value it will work since the open method would have been called eventually. The only change we would need is that when the rebalance listener is invoked we would need to keep track of the topics currently subscribed. For e.g. PatternSubscription can have an instance variable called Set topics that would be correctly updated anytime onPartitionsRevoked or onPartitionsAssigned is called. We can use that instance variable to return the value when getTopicsString is called on that object. Does that work? Regarding point 2, we could move the kafka client version to storm-kafka-monitor module. Can you elaborate a bit more on your concern? Is it that if kafka cluster is upgraded, storm-kafka-monitor wont work? In that case storm-kafka-client module will also have to be updated and topology jar rebuilt. Correct? I think in general, we have a certain compatibility restriction in that a given storm release works or has been tested with a certain version of kafka clients. Correct me if i am wrong. Regarding 3, the main reason for creating a separate module and calling bash script from storm UI is so that storm-core does not have a direct kafka dependency since that made sense. For windows, cygwin can be a workaround. Plus, i dont know how many windows users we have. We can start a thread to see if there is really a user base for which we need to support a use case. I dont know the details about how metrics would work. We could have opinions from other people. If that is indeed the right way to go about it then I am all for changing it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/2155 +1(NB) given that people prefer keeping the Builder class around. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2168: STORM-2562: Use stronger key size than default for...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/2168 STORM-2562: Use stronger key size than default for blow fish key gene⦠â¦rator and get rid of stack trace You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-2562-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2168.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2168 commit cee60478cd557cb96dab63fbb9e5f8d262e91487 Author: Priyank <ps...@hortonworks.com> Date: 2017-06-20T19:01:13Z STORM-2562: Use stronger key size than default for blow fish key generator and get rid of stack trace --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2167: STORM-2562: Use stronger key size than default for...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/2167 STORM-2562: Use stronger key size than default for blow fish key gene⦠â¦rator and get rid of stack trace You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-2562 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2167.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2167 commit 3baf61bad84001cd3ed21d30dc56b3df60e75e20 Author: Priyank <ps...@hortonworks.com> Date: 2017-06-20T19:01:13Z STORM-2562: Use stronger key size than default for blow fish key generator and get rid of stack trace --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/2155 @srdo Overall, this looks good. But I also think we should get rid of the Builder class totally. Reason is that it does not work well with flux yaml since you create the object using builder.build(). Let me know your thoughts on that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/2155#discussion_r121530308 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java --- @@ -79,16 +78,46 @@ UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST } +/** + * Convenience method to get a Builder for String key/value spouts. Sets the key/value Deserializers to the StringDeserializer class. + * Users who need different deserializers should use the Builder constructors instead, + * and set the deserializer classes via {@link Builder#setProp(java.lang.String, java.lang.Object)} + * @param bootstrapServers The bootstrap servers for the consumer + * @param topics The topics to subscribe to + * @return The new builder + */ public static Builder<String, String> builder(String bootstrapServers, String ... topics) { -return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); +return setDefaultStringDeserializers(new Builder<>(bootstrapServers, topics)); } +/** + * Convenience method to get a Builder for String key/value spouts. Sets the key/value Deserializers to the StringDeserializer class. + * Users who need different deserializers should use the Builder constructors instead, + * and set the deserializer classes via {@link Builder#setProp(java.lang.String, java.lang.Object)} + * @param bootstrapServers The bootstrap servers for the consumer + * @param topics The topics to subscribe to + * @return The new builder + */ public static Builder<String, String> builder(String bootstrapServers, Collection topics) { -return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); +return setDefaultStringDeserializers(new Builder<>(bootstrapServers, topics)); } +/** + * Convenience method to get a Builder for String key/value spouts. Sets the key/value Deserializers to the StringDeserializer class. + * Users who need different deserializers should use the Builder constructors instead, + * and set the deserializer classes via {@link Builder#setProp(java.lang.String, java.lang.Object)} + * @param bootstrapServers The bootstrap servers for the consumer + * @param topics The topic pattern to subscribe to + * @return The new builder + */ public static Builder<String, String> builder(String bootstrapServers, Pattern topics) { -return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics); +return setDefaultStringDeserializers(new Builder<>(bootstrapServers, topics)); +} + +private static Builder<String, String> setDefaultStringDeserializers(Builder<String, String> builder) { +builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); --- End diff -- I think ByteArrayDeserializer makes more sense. Not a strong preference though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2126: Add the option to set client.id to storm-kafka and storm-...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/2126 @carl34 Good point. I did not realize it was final until you brought it up. In that case i am okay with your change for storm-kafka module. For storm-kafka-client module i still prefer avoiding the method setCliendId. I would rather use setProp instead. Can you use setProp instead on the builder for now? And when i create a PR for refactoring storm-kafka-client you can comment about any issues that you might run in to because of the changes in the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2126: Add the option to set client.id to storm-kafka and storm-...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/2126 @carl34 https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java#L32 is already a public property. Are you saying passing it as a constructor argument is more clear than setting a public property? If yes, I dont agree with you on that. That extra constructor does not add any value. Plus, community is encouraged to use new spout. So unless there is a blocker or critical bug in the old spout i prefer not changing any code there. For storm-kafka-client I plan to raise a PR to get rid of the builder pattern. Builder pattern relies on builder.build() and pass the returned object to the constructor. Unfortunately, that does not work well with flux. Hence setProp method, either on the builder (or may be in future directly on KafkaSpoutConfig) is encouraged. There are so many other properties on kafka consumer api. It does not make sense to expose each one of those as a setXXX java method. Without those setXXX methods it is a much cleaner api for spout and its config object. Intellisense should not be the top priority here. I would prefer a cleaner api. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2126: Add the option to set client.id to storm-kafka and storm-...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/2126 @carl34 We already have setProp method in KafkaSpoutConfig in storm-kafka-client that can be used to set any producer property. I dont think this adds any value. Can you elaborate on why you need that method? As far as the storm-kafka module is concerned that property is already public. I dont see a reason for adding one more constructor argument. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2116: STORM-2512: Make constructor public and add one mo...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/2116 STORM-2512: Make constructor public and add one more builder constructor You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-2512 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2116.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2116 commit 9c616fd5ee887e2cf17582b4e80adc3b82158c37 Author: Priyank <ps...@hortonworks.com> Date: 2017-05-13T06:32:08Z STORM-2512: Make constructor public and add one more builder constructor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2115: STORM-2512: Make constructor public and add one mo...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/2115 STORM-2512: Make constructor public and add one more builder constructor You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-2512-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2115.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2115 commit 5ff7865cf0b86f40e99b54e789fa60b8843191aa Author: Priyank <ps...@hortonworks.com> Date: 2017-05-13T06:32:08Z STORM-2512: Make constructor public and add one more builder constructor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #2081: [STORM-2482] Refactor the Storm auto credential plugins t...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/2081 +1(non-binding) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2081: [STORM-2482] Refactor the Storm auto credential pl...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/2081#discussion_r113541511 --- Diff: external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java --- @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.common; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.math3.util.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.storm.security.INimbusCredentialPlugin; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.security.auth.ICredentialsRenewer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import javax.xml.bind.DatatypeConverter; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * The base class that for auto credential plugins that abstracts out some of the common functionality. + */ +public abstract class AbstractAutoCreds implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin { +private static final Logger LOG = LoggerFactory.getLogger(AbstractAutoCreds.class); +public static final String CONFIG_KEY_RESOURCES = "resources"; + +private List configKeys = new ArrayList<>(); +private Map<String, Map<String, Object>> configMap = new HashMap<>(); + +@Override +public void prepare(Map conf) { +doPrepare(conf); +String configKeyString = getConfigKeyString(); +if (conf.containsKey(configKeyString)) { +configKeys.addAll((List) conf.get(configKeyString)); +for (String key : configKeys) { +if (conf.containsKey(key)) { +Map<String, Object> config = (Map<String, Object>) conf.get(key); +configMap.put(key, config); +LOG.info("configKey = {}, config = {}", key, config); +} +} +} +} + +@Override +public void populateCredentials(Map<String, String> credentials, Map conf) { +try { +if (configKeys != null) { +Map<String, Object> updatedConf = updateConfigs(conf); +for (String configKey : configKeys) { +credentials.put(getCredentialKey(configKey), + DatatypeConverter.printBase64Binary(getHadoopCredentials(updatedConf, configKey))); +} +} else { +credentials.put(getCredentialKey(StringUtils.EMPTY), + DatatypeConverter.printBase64Binary(getHadoopCredentials(conf))); +} +LOG.info("Tokens added to credentials map."); +} catch (Exception e) { +LOG.error("Could not populate credentials.", e); +} +} + +private Map<String, Object> updateConfigs(Map topologyConf) { +Map<String, Object> res = new HashMap<>(topologyConf); +if (configKeys != null) { +for (String configKey : configKeys) { +if (!res.containsKey(configKey) && configMap.containsKey(configKey)) { +res.put(configKey, configMap.
[GitHub] storm pull request #2081: [STORM-2482] Refactor the Storm auto credential pl...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/2081#discussion_r113535108 --- Diff: external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java --- @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.common; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.math3.util.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.storm.security.INimbusCredentialPlugin; +import org.apache.storm.security.auth.IAutoCredentials; +import org.apache.storm.security.auth.ICredentialsRenewer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.security.auth.Subject; +import javax.xml.bind.DatatypeConverter; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * The base class that for auto credential plugins that abstracts out some of the common functionality. + */ +public abstract class AbstractAutoCreds implements IAutoCredentials, ICredentialsRenewer, INimbusCredentialPlugin { +private static final Logger LOG = LoggerFactory.getLogger(AbstractAutoCreds.class); +public static final String CONFIG_KEY_RESOURCES = "resources"; + +private List configKeys = new ArrayList<>(); +private Map<String, Map<String, Object>> configMap = new HashMap<>(); + +@Override +public void prepare(Map conf) { +doPrepare(conf); +String configKeyString = getConfigKeyString(); +if (conf.containsKey(configKeyString)) { +configKeys.addAll((List) conf.get(configKeyString)); +for (String key : configKeys) { +if (conf.containsKey(key)) { +Map<String, Object> config = (Map<String, Object>) conf.get(key); +configMap.put(key, config); +LOG.info("configKey = {}, config = {}", key, config); +} +} +} +} + +@Override +public void populateCredentials(Map<String, String> credentials, Map conf) { +try { +if (configKeys != null) { +Map<String, Object> updatedConf = updateConfigs(conf); +for (String configKey : configKeys) { +credentials.put(getCredentialKey(configKey), + DatatypeConverter.printBase64Binary(getHadoopCredentials(updatedConf, configKey))); +} +} else { +credentials.put(getCredentialKey(StringUtils.EMPTY), + DatatypeConverter.printBase64Binary(getHadoopCredentials(conf))); +} +LOG.info("Tokens added to credentials map."); +} catch (Exception e) { +LOG.error("Could not populate credentials.", e); +} +} + +private Map<String, Object> updateConfigs(Map topologyConf) { +Map<String, Object> res = new HashMap<>(topologyConf); +if (configKeys != null) { +for (String configKey : configKeys) { +if (!res.containsKey(configKey) && configMap.containsKey(configKey)) { +res.put(configKey, configMap.
[GitHub] storm pull request #2081: [STORM-2482] Refactor the Storm auto credential pl...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/2081#discussion_r113517229 --- Diff: docs/storm-hbase.md --- @@ -56,22 +56,43 @@ The approach described above requires that all potential worker hosts have "stor multiple topologies on a cluster , each with different hbase user, you will have to create multiple keytabs and distribute it to all workers. Instead of doing that you could use the following approach: -Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user. -The nimbus need to start with following configurations: +Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user. The nimbus should be started with following configurations: +``` nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"] hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.) hbase.kerberos.principal: "superu...@example.com" -nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, -if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is -atleast 1 hour less then that.) +nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is atleast 1 hour less then that.) +``` Your topology configuration should have: -topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] + +``` +topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] +``` If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hbase configuration -files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath. +files(core-site.xml, hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath. + +As an alternative to adding the configuration files (core-site.xml, hdfs-site.xml and hbase-site.xml) to the classpath, you could specify the configurations as a part of the topology configuration. E.g. in you custom storm.yaml (or -c option while submitting the topology), + +``` +hbaseCredentialsConfigKeys : ["cluster1", "cluster2"] (the hbase clusters you want to fetch the tokens from) +cluster1: [{"config1": "value1", "config2": "value2", ... }] (A map of config key-values specific to cluster1) +cluster2: [{"config1": "value1", "hbase.keytab.file": "/path/to/keytab/for/cluster2/on/nimubs", "hbase.kerberos.principal": "cluster2u...@example.com"}] (here along with other configs, we have custom keytab and principal for "cluster2" which will override the keytab/principal specified at topology level) +``` + +Instead of specifying key values you may also directly specify the resource files for e.g., + +``` +cluster1: [{"resources": ["/path/to/core-site1.xml", "/path/to/hbase-site1.xml"]}] +cluster2: [{"resources": ["/path/to/core-site2.xml", "/path/to/hbase-site2.xml"]}] +``` + +Storm will download the tokens separately for each of the clusters and populate it into the subject and also renew the tokens periodically. +This way it would be possible to run multiple bolts connecting to separate HBase cluster within the same topology. + --- End diff -- Just a few lines below it says "As nimbus is impersonating. and it mentions storm.kerberos.principal. Is that right or should we change it to hbase.kerberos.principal ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2081: [STORM-2482] Refactor the Storm auto credential pl...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/2081#discussion_r113515141 --- Diff: docs/SECURITY.md --- @@ -423,16 +423,18 @@ nimbus.impersonation.acl: ### Automatic Credentials Push and Renewal Individual topologies have the ability to push credentials (tickets and tokens) to workers so that they can access secure services. Exposing this to all of the users can be a pain for them. -To hide this from them in the common case plugins can be used to populate the credentials, unpack them on the other side into a java Subject, and also allow Nimbus to renew the credentials if needed. -These are controlled by the following configs. topology.auto-credentials is a list of java plugins, all of which must implement IAutoCredentials interface, that populate the credentials on gateway -and unpack them on the worker side. On a kerberos secure cluster they should be set by default to point to org.apache.storm.security.auth.kerberos.AutoTGT. -nimbus.credential.renewers.classes should also be set to this value so that nimbus can periodically renew the TGT on behalf of the user. +To hide this from them in the common case plugins can be used to populate the credentials, unpack them on the other side into a java Subject, and also allow Nimbus to renew the credentials if needed. These are controlled by the following configs. + +`topology.auto-credentials` is a list of java plugins, all of which must implement the `IAutoCredentials` interface, that populate the credentials on gateway --- End diff -- Is gateway here nimbus or namenode like entity? Should we rename it to nimbus? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1978: STORM-2387 Handle tick tuples properly for Bolts i...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1978#discussion_r103527554 --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java --- @@ -76,30 +77,33 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @Override public void execute(final Tuple tuple) { - Future future = tranquilizer.send((druidEventMapper.getEvent(tuple))); -LOG.debug("Sent tuple : [{}]" , tuple); +if (TupleUtils.isTick(tuple)) { --- End diff -- I agree with @satishd Right now, onTickTuple method is in abstract base classes of different connectors and not even used. Taking Satish's approach is a little more work since we will have to change the bolts that actually process the tick tuples to separate out onProcess and onTickTuple logic. But i think its worth. @dossett I did not exactly get the problem with exceptions. Can't we have a catch all and rethrow as a RuntimeException? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1815: STORM-2235 Introduce new option: 'add remote repositories...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1815 +1 (non-binding) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1806: STORM-2226: Fix kafka spout offset lag ui for kerb...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/1806 STORM-2226: Fix kafka spout offset lag ui for kerberized kafka You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-2226 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1806.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1806 commit 8cd0507003629110598c2e6f774523a3e62665fc Author: Priyank <ps...@hortonworks.com> Date: 2016-11-30T04:56:01Z STORM-2226: Fix kafka spout offset lag ui for kerberized kafka --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1665: STORM-2074: fix storm-kafka-monitor NPE bug
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1665 +1 (non-binding) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1665: STORM-2074: fix storm-kafka-monitor NPE bug
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1665 @vesense Not sure if you saw my comment about exiting with a code of 1 in getOldConsumerOffsetsFromZk. Can you check that comment and address it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1665: STORM-2074: fix storm-kafka-monitor NPE bug
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1665#discussion_r77237088 --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java --- @@ -89,6 +89,10 @@ public static void main (String args[]) { printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required with " + OPTION_OLD_CONSUMER_LONG); } +String zkNode = commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG); --- End diff -- I am not sure if just checking the length <= 1 is really that useful. As you mentioned that it should start with / may be we add check satisfying all the conditions for a valid zookeeper node name? Or just remove any check? We already have a check to see if node exists later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1665: STORM-2074: fix storm-kafka-monitor NPE bug
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1665#discussion_r77034491 --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java --- @@ -373,16 +377,20 @@ private static Options buildOptions () { curatorFramework.start(); String partitionPrefix = "partition_"; String zkPath = oldKafkaSpoutOffsetQuery.getZkPath(); -if (!zkPath.endsWith("/")) { -zkPath += "/"; + if (zkPath.endsWith("/")) { --- End diff -- Is there a reason for removing trailing / here and then appending in the loop below? The loop already has a check for curatorFramework.checkExists().forPath(path). May be we can just print and exit therE? Also, can you call the printUsageAndExit method here to be consistent? Or is there a reason you could not use it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1665: STORM-2074: fix storm-kafka-monitor NPE bug
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1665#discussion_r77033989 --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java --- @@ -89,6 +89,10 @@ public static void main (String args[]) { printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required with " + OPTION_OLD_CONSUMER_LONG); } +String zkNode = commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG); --- End diff -- While adding OPTION_ZK_NODE_COMMITTED_NODE_LONG to options it is added using the method options.addOption(String opt, String longOpt, boolean hasArg, String description) The third argument is set to true in the call which specifies that the argument must have a value. Does not that take care of null or empty value? Do we need to have this explicit check here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1607: STORM-2021: Add missing licenses.
Github user priyank5485 closed the pull request at: https://github.com/apache/storm/pull/1607 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1607: STORM-2021: Add missing licenses.
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1607 Closing this in favor of https://github.com/apache/storm/pull/1606/files --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1607: STORM-2021: Add missing licenses.
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/1607 STORM-2021: Add missing licenses. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-2021 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1607.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1607 commit 05cd5e47173814c050c52a76dbe9577a7aef8e30 Author: Priyank <ps...@hortonworks.com> Date: 2016-08-04T17:04:31Z STORM-2021: Add missing licenses. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1605: STORM-2014: Put logic around dropping messages into Retry...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1605 +1 (non-binding). I had talked to @hmcl briefly about this. We should have his review as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r7301 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java --- @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.storm.spout.SpoutOutputCollector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +class KinesisRecordsManager { +private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); +// object handling zk interaction +private transient ZKConnection zkConnection; +// object handling interaction with kinesis +private transient KinesisConnection kinesisConnection; +// Kinesis Spout KinesisConfig object +private transient final KinesisConfig kinesisConfig; +// Queue of records per shard fetched from kinesis and are waiting to be emitted +private transient Map<String, LinkedList> toEmitPerShard = new HashMap<>(); +// Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted +private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>(); +// Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the +// sequence number to commit. Logic explained in commit +private transient Map<String, TreeSet> emittedPerShard = new HashMap<>(); +// sorted acked sequence numbers - needed to figure out what sequence number can be committed +private transient Map<String, TreeSet> ackedPerShard = new HashMap<>(); +// sorted failed sequence numbers - needed to figure out what sequence number can be committed +private transient Map<String, TreeSet> failedPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for new messages +private transient Map<String, String> shardIteratorPerShard = new HashMap<>(); +// last fetched sequence number corresponding to position in shard +private transient Map<String, String> fetchedSequenceNumberPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for failed messages +private transient Map<KinesisMessageId, String> shardIteratorPerFailedMessage = new HashMap<>(); +// timestamp to decide when to commit to zk again +private transient long lastCommitTime; +// boolean to track deactivated state +private transient boolean deactivated; + +KinesisRecordsManager (KinesisConfig kinesisConfig) { +this.kinesisConfig = kinesisConfig; +this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo()); +this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo()); +} + +void initialize (int myTaskIndex, int totalTasks) { +deactivated = false; +lastCommitTime = System.currentTimeMillis(); +kinesisConnection.initialize(); +zkConnection.initialize(); +List shards = kinesisConnection.getShardsForStream(kines
[GitHub] storm issue #1586: STORM-1839: Storm spout implementation for Amazon Kinesis...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1586 @harshach I added some more comments in ack method and commit method for acking logic. I also moved code interacting with zk and kinesis in to its own class to make KinesisRecordsManager smaller. Please review when you get a chance --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1586: STORM-1839: Storm spout implementation for Amazon Kinesis...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1586 @harshach Thanks for the review. I have addressed your comments and pushed a commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72306421 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java --- @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import java.io.Serializable; + +public class ZkInfo implements Serializable { +// comma separated list of zk connect strings to connect to zookeeper e.g. localhost:2181 +private final String zkUrl; +// zk node under which to commit the sequence number of messages. e.g. /committed_sequence_numbers +private final String zkNode; +// zk session timeout in milliseconds +private final Integer sessionTimeoutMs; +// zk connection timeout in milliseconds +private final Integer connectionTimeoutMs; +// interval at which to commit offsets to zk in milliseconds +private final Long commitIntervalMs; +// number of retry attempts for zk +private final Integer retryAttempts; +// time to sleep between retries in milliseconds +private final Integer retryIntervalMs; + +/** + * Default constructor that uses defaults for a local setup + */ +public ZkInfo () { +this("localhost:2181", "/kinesisOffsets", 2, 15000, 1L, 3, 2000); +} + +public ZkInfo (String zkUrl, String zkNode, Integer sessionTimeoutMs, Integer connectionTimeoutMs, Long commitIntervalMs, Integer retryAttempts, Integer +retryIntervalMs) { +this.zkUrl = zkUrl; +this.zkNode = zkNode; +this.sessionTimeoutMs = sessionTimeoutMs; +this.connectionTimeoutMs = connectionTimeoutMs; +this.commitIntervalMs = commitIntervalMs; +this.retryAttempts = retryAttempts; +this.retryIntervalMs = retryIntervalMs; +validate(); +} + +public String getZkUrl() { +return zkUrl; +} + +public String getZkNode() { +return zkNode; +} + +public Integer getSessionTimeoutMs() { +return sessionTimeoutMs; +} + +public Integer getConnectionTimeoutMs() { +return connectionTimeoutMs; +} + +public Long getCommitIntervalMs() { +return commitIntervalMs; +} + +public Integer getRetryAttempts() { +return retryAttempts; +} + +public Integer getRetryIntervalMs() { +return retryIntervalMs; +} + +private void validate () { + +if (zkUrl == null || zkUrl.length() < 1) { +throw new IllegalArgumentException("zkUrl must be specified to connect to zookeeper"); +} +if (zkNode == null || zkNode.length() < 1) { +throw new IllegalArgumentException("zkNode must be specified"); +} +checkPositive(sessionTimeoutMs, "sessionTimeoutMs"); +checkPositive(connectionTimeoutMs, "connectionTimeoutMs"); +checkPositive(commitIntervalMs, "commitIntervalMs"); +checkPositive(retryAttempts, "retryAttempts"); +checkPositive(retryIntervalMs, "retryIntervalMs"); +} + +private void checkPositive (Integer argument, String name) { +if (argument == null && argument <= 0) { +throw new IllegalArgumentException(name + " must be positive"); +} +} +private void checkPositive (Long argument, String name) { +if (argument == null && argument <= 0) { +throw new IllegalArgumentException(name + " must be positive"); +} +} + +@Override +public String toString() { +r
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72306731 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java --- @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import java.io.Serializable; + +public class ZkInfo implements Serializable { +// comma separated list of zk connect strings to connect to zookeeper e.g. localhost:2181 +private final String zkUrl; +// zk node under which to commit the sequence number of messages. e.g. /committed_sequence_numbers +private final String zkNode; +// zk session timeout in milliseconds +private final Integer sessionTimeoutMs; +// zk connection timeout in milliseconds +private final Integer connectionTimeoutMs; +// interval at which to commit offsets to zk in milliseconds +private final Long commitIntervalMs; +// number of retry attempts for zk +private final Integer retryAttempts; +// time to sleep between retries in milliseconds +private final Integer retryIntervalMs; + +/** + * Default constructor that uses defaults for a local setup + */ +public ZkInfo () { +this("localhost:2181", "/kinesisOffsets", 2, 15000, 1L, 3, 2000); --- End diff -- Will remove it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72306241 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java --- @@ -0,0 +1,566 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.regions.Region; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.zookeeper.CreateMode; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +class KinesisRecordsManager { +private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); +// zk interaction object +private transient CuratorFramework curatorFramework; +// Kinesis Spout Config object +private transient final Config config; +// Queue of records per shard fetched from kinesis and are waiting to be emitted +private transient Map<String, LinkedList> toEmitPerShard = new HashMap<>(); +// Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted +private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>(); +// Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the +// sequence number to commit. Logic explained in commit +private transient Map<String, TreeSet> emittedPerShard = new HashMap<>(); +// sorted acked sequence numbers - needed to figure out what sequence number can be committed +private transient Map<String, TreeSet> ackedPerShard = new HashMap<>(); +// sorted failed sequence numbers - needed to figure out what sequence number can be committed +private transient Map<String, TreeSet> failedPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for new messages +private transient Map<String, String> shardIteratorPerShard = new HashMap<>(); +// last fetched sequence number corresponding to position in shard +private transient Map<String, String> fetchedSequenceNumberPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for failed messages +private transient Map<KinesisMessageId, String> shardIteratorPerFailedMessage = new HashMap<>(); +// timestamp to decide when to commit to zk again +private transient long lastCommitTime; +// boolean to track deactivate
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72305349 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java --- @@ -0,0 +1,566 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.regions.Region; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.zookeeper.CreateMode; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +class KinesisRecordsManager { +private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); +// zk interaction object +private transient CuratorFramework curatorFramework; +// Kinesis Spout Config object +private transient final Config config; +// Queue of records per shard fetched from kinesis and are waiting to be emitted +private transient Map<String, LinkedList> toEmitPerShard = new HashMap<>(); +// Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted +private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>(); +// Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the +// sequence number to commit. Logic explained in commit +private transient Map<String, TreeSet> emittedPerShard = new HashMap<>(); +// sorted acked sequence numbers - needed to figure out what sequence number can be committed +private transient Map<String, TreeSet> ackedPerShard = new HashMap<>(); +// sorted failed sequence numbers - needed to figure out what sequence number can be committed +private transient Map<String, TreeSet> failedPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for new messages +private transient Map<String, String> shardIteratorPerShard = new HashMap<>(); +// last fetched sequence number corresponding to position in shard +private transient Map<String, String> fetchedSequenceNumberPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for failed messages +private transient Map<KinesisMessageId, String> shardIteratorPerFailedMessage = new HashMap<>(); +// timestamp to decide when to commit to zk again +private transient long lastCommitTime; +// boolean to track deactivate
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72305189 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java --- @@ -0,0 +1,566 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.regions.Region; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryNTimes; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.zookeeper.CreateMode; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +class KinesisRecordsManager { +private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class); +// zk interaction object +private transient CuratorFramework curatorFramework; +// Kinesis Spout Config object +private transient final Config config; +// Queue of records per shard fetched from kinesis and are waiting to be emitted +private transient Map<String, LinkedList> toEmitPerShard = new HashMap<>(); +// Map of records that were fetched from kinesis as a result of failure and are waiting to be emitted +private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<>(); +// Sequence numbers per shard that have been emitted. LinkedHashSet as we need to remove on ack or fail. At the same time order is needed to figure out the +// sequence number to commit. Logic explained in commit +private transient Map<String, TreeSet> emittedPerShard = new HashMap<>(); +// sorted acked sequence numbers - needed to figure out what sequence number can be committed +private transient Map<String, TreeSet> ackedPerShard = new HashMap<>(); +// sorted failed sequence numbers - needed to figure out what sequence number can be committed +private transient Map<String, TreeSet> failedPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for new messages +private transient Map<String, String> shardIteratorPerShard = new HashMap<>(); +// last fetched sequence number corresponding to position in shard +private transient Map<String, String> fetchedSequenceNumberPerShard = new HashMap<>(); +// shard iterator corresponding to position in shard for failed messages +private transient Map<KinesisMessageId, String> shardIteratorPerFailedMessage = new HashMap<>(); +// timestamp to decide when to commit to zk again +private transient long lastCommitTime; +// boolean to track deactivate
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72303034 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java --- @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Regions; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Serializable; +import java.util.Arrays; + +public class KinesisConnectionInfo implements Serializable { +private final byte[] serializedKinesisCredsProvider; +private final byte[] serializedkinesisClientConfig; +private final Integer recordsLimit; +private final Regions region; + +private transient AWSCredentialsProvider credentialsProvider; +private transient ClientConfiguration clientConfiguration; + +/** + * + * @param credentialsProvider implementation to provide credentials to connect to kinesis + * @param clientConfiguration client configuration to pass to kinesis client + * @param region region to connect to + * @param recordsLimit max records to be fetched in a getRecords request to kinesis + */ +public KinesisConnectionInfo (AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, Regions region, Integer recordsLimit) { +if (recordsLimit == null || recordsLimit <= 0) { +throw new IllegalArgumentException("recordsLimit has to be a positive integer"); +} +if (region == null) { +throw new IllegalArgumentException("region cannot be null"); +} +serializedKinesisCredsProvider = getKryoSerializedBytes(credentialsProvider); +serializedkinesisClientConfig = getKryoSerializedBytes(clientConfiguration); +this.recordsLimit = recordsLimit; +this.region = region; + +this.credentialsProvider = null; +this.clientConfiguration = null; +} + +public Integer getRecordsLimit() { +return recordsLimit; +} + +public AWSCredentialsProvider getCredentialsProvider() { +if (credentialsProvider == null) { +credentialsProvider = (AWSCredentialsProvider) this.getKryoDeserializedObject(serializedKinesisCredsProvider); +} +return credentialsProvider; +} + +public ClientConfiguration getClientConfiguration() { +if (clientConfiguration == null) { +clientConfiguration = (ClientConfiguration) this.getKryoDeserializedObject(serializedkinesisClientConfig); +} +return clientConfiguration; +} + +public Regions getRegion() { +return region; +} + +private byte[] getKryoSerializedBytes (final Object obj) { +final Kryo kryo = new Kryo(); +final ByteArrayOutputStream os = new ByteArrayOutputStream(); +final Output output = new Output(os); +kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); +kryo.writeClassAndObject(output, obj); +output.flush(); +return os.toByteArray(); +} + +private Object getKryoDeserializedObject (final byte[] ser) { +final Kryo kryo = new Kryo(); +final Input input = new Input(new ByteArrayInputStream(ser)); +kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); +return kryo.readClassAndObject(input); +} + +
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72302997 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java --- @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Regions; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.objenesis.strategy.StdInstantiatorStrategy; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Serializable; +import java.util.Arrays; + +public class KinesisConnectionInfo implements Serializable { +private final byte[] serializedKinesisCredsProvider; +private final byte[] serializedkinesisClientConfig; +private final Integer recordsLimit; +private final Regions region; + +private transient AWSCredentialsProvider credentialsProvider; +private transient ClientConfiguration clientConfiguration; + +/** + * + * @param credentialsProvider implementation to provide credentials to connect to kinesis + * @param clientConfiguration client configuration to pass to kinesis client + * @param region region to connect to + * @param recordsLimit max records to be fetched in a getRecords request to kinesis + */ +public KinesisConnectionInfo (AWSCredentialsProvider credentialsProvider, ClientConfiguration clientConfiguration, Regions region, Integer recordsLimit) { +if (recordsLimit == null || recordsLimit <= 0) { +throw new IllegalArgumentException("recordsLimit has to be a positive integer"); +} +if (region == null) { +throw new IllegalArgumentException("region cannot be null"); +} +serializedKinesisCredsProvider = getKryoSerializedBytes(credentialsProvider); +serializedkinesisClientConfig = getKryoSerializedBytes(clientConfiguration); +this.recordsLimit = recordsLimit; +this.region = region; + +this.credentialsProvider = null; --- End diff -- Will change it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72291643 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java --- @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable { +private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class); +// Wait interfal for retrying after first failure +private final Long initialDelayMillis; +// Base for exponential function in seconds for retrying for second, third and so on failures +private final Long baseSeconds; +// Maximum number of retries +private final Long maxRetries; +// map to track number of failures for each kinesis message that failed +private Map<KinesisMessageId, Long> failCounts = new HashMap<>(); +// map to track next retry time for each kinesis message that failed +private Map<KinesisMessageId, Long> retryTimes = new HashMap<>(); +// sorted set of records to be retrued based on retry time. earliest retryTime record comes first +private SortedSet retryMessageSet = new TreeSet<>(new RetryTimeComparator()); + +/** + * no args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) secs for + * retry i where i = 2,3, + */ +public ExponentialBackoffRetrier () { +this(100L, 2L, Long.MAX_VALUE); +} + +/** + * + * @param initialDelayMillis delay in milliseconds for first retry + * @param baseSeconds base for exponent function in seconds + * @param maxRetries maximum number of retries before the record is discarded/acked + */ +public ExponentialBackoffRetrier (Long initialDelayMillis, Long baseSeconds, Long maxRetries) { +this.initialDelayMillis = initialDelayMillis; +this.baseSeconds = baseSeconds; +this.maxRetries = maxRetries; +validate(); +} + +private void validate () { +if (initialDelayMillis < 0) { +throw new IllegalArgumentException("initialDelayMillis cannot be negative." ); +} +if (baseSeconds < 0) { +throw new IllegalArgumentException("baseSeconds cannot be negative."); +} +if (maxRetries < 0) { +throw new IllegalArgumentException("maxRetries cannot be negative."); +} +} +@Override +public boolean failed(KinesisMessageId messageId) { +LOG.debug("Handling failed message " + messageId); +// if maxRetries is 0, dont retry and return false as per interface contract +if (maxRetries == 0) { +LOG.debug("maxRetries set to 0. Hence not queueing " + messageId); +return false; +} +// if first failure add it to the count map +if (!failCounts.containsKey(messageId)) { +failCounts.put(messageId, 0L); +} +// increment the fail count as we started with 0 +Long failCount = failCounts.get(messageId); +failCounts.put(messageId, ++failCount); +// if fail count is greater than maxRetries, discard or ack. for e.g. for maxRetries 3, 4 failures are allowed at maximum +if (failCount > maxRetries) { +LOG.debug("maxR
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72292758 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java --- @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable { +private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class); +// Wait interfal for retrying after first failure +private final Long initialDelayMillis; +// Base for exponential function in seconds for retrying for second, third and so on failures +private final Long baseSeconds; +// Maximum number of retries +private final Long maxRetries; +// map to track number of failures for each kinesis message that failed +private Map<KinesisMessageId, Long> failCounts = new HashMap<>(); +// map to track next retry time for each kinesis message that failed +private Map<KinesisMessageId, Long> retryTimes = new HashMap<>(); +// sorted set of records to be retrued based on retry time. earliest retryTime record comes first +private SortedSet retryMessageSet = new TreeSet<>(new RetryTimeComparator()); --- End diff -- No. Messages are not stored anywhere in ExponentialBackoffRetrier. retryTimes is just used to implement the comparator interface that will be used by retryMessageSet TreeSet to order the messages on their next retry time. You can check out the RetryTimeComparator in this class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72292144 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java --- @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable { +private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class); +// Wait interfal for retrying after first failure +private final Long initialDelayMillis; --- End diff -- ExponentialBackoffRetrier has two constructors. The no args constructor will set the default values for these variables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72291939 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java --- @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable { +private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class); +// Wait interfal for retrying after first failure +private final Long initialDelayMillis; +// Base for exponential function in seconds for retrying for second, third and so on failures +private final Long baseSeconds; +// Maximum number of retries +private final Long maxRetries; +// map to track number of failures for each kinesis message that failed +private Map<KinesisMessageId, Long> failCounts = new HashMap<>(); +// map to track next retry time for each kinesis message that failed +private Map<KinesisMessageId, Long> retryTimes = new HashMap<>(); +// sorted set of records to be retrued based on retry time. earliest retryTime record comes first +private SortedSet retryMessageSet = new TreeSet<>(new RetryTimeComparator()); + +/** + * no args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) secs for + * retry i where i = 2,3, + */ +public ExponentialBackoffRetrier () { +this(100L, 2L, Long.MAX_VALUE); +} + +/** + * + * @param initialDelayMillis delay in milliseconds for first retry + * @param baseSeconds base for exponent function in seconds + * @param maxRetries maximum number of retries before the record is discarded/acked + */ +public ExponentialBackoffRetrier (Long initialDelayMillis, Long baseSeconds, Long maxRetries) { +this.initialDelayMillis = initialDelayMillis; +this.baseSeconds = baseSeconds; +this.maxRetries = maxRetries; +validate(); +} + +private void validate () { +if (initialDelayMillis < 0) { +throw new IllegalArgumentException("initialDelayMillis cannot be negative." ); +} +if (baseSeconds < 0) { +throw new IllegalArgumentException("baseSeconds cannot be negative."); +} +if (maxRetries < 0) { +throw new IllegalArgumentException("maxRetries cannot be negative."); +} +} +@Override +public boolean failed(KinesisMessageId messageId) { +LOG.debug("Handling failed message " + messageId); +// if maxRetries is 0, dont retry and return false as per interface contract +if (maxRetries == 0) { +LOG.debug("maxRetries set to 0. Hence not queueing " + messageId); --- End diff -- I will change it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72291774 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java --- @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +public class ExponentialBackoffRetrier implements FailedMessageRetryHandler, Serializable { +private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class); +// Wait interfal for retrying after first failure +private final Long initialDelayMillis; +// Base for exponential function in seconds for retrying for second, third and so on failures +private final Long baseSeconds; +// Maximum number of retries +private final Long maxRetries; +// map to track number of failures for each kinesis message that failed +private Map<KinesisMessageId, Long> failCounts = new HashMap<>(); +// map to track next retry time for each kinesis message that failed +private Map<KinesisMessageId, Long> retryTimes = new HashMap<>(); +// sorted set of records to be retrued based on retry time. earliest retryTime record comes first +private SortedSet retryMessageSet = new TreeSet<>(new RetryTimeComparator()); + +/** + * no args constructor that uses defaults of 100 ms for first retry, max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) secs for + * retry i where i = 2,3, + */ +public ExponentialBackoffRetrier () { +this(100L, 2L, Long.MAX_VALUE); +} + +/** + * + * @param initialDelayMillis delay in milliseconds for first retry + * @param baseSeconds base for exponent function in seconds + * @param maxRetries maximum number of retries before the record is discarded/acked + */ +public ExponentialBackoffRetrier (Long initialDelayMillis, Long baseSeconds, Long maxRetries) { +this.initialDelayMillis = initialDelayMillis; +this.baseSeconds = baseSeconds; +this.maxRetries = maxRetries; +validate(); +} + +private void validate () { +if (initialDelayMillis < 0) { +throw new IllegalArgumentException("initialDelayMillis cannot be negative." ); +} +if (baseSeconds < 0) { +throw new IllegalArgumentException("baseSeconds cannot be negative."); +} +if (maxRetries < 0) { +throw new IllegalArgumentException("maxRetries cannot be negative."); +} +} +@Override +public boolean failed(KinesisMessageId messageId) { +LOG.debug("Handling failed message " + messageId); +// if maxRetries is 0, dont retry and return false as per interface contract +if (maxRetries == 0) { +LOG.debug("maxRetries set to 0. Hence not queueing " + messageId); +return false; +} +// if first failure add it to the count map +if (!failCounts.containsKey(messageId)) { +failCounts.put(messageId, 0L); +} +// increment the fail count as we started with 0 +Long failCount = failCounts.get(messageId); +failCounts.put(messageId, ++failCount); +// if fail count is greater than maxRetries, discard or ack. for e.g. for maxRetries 3, 4 failures are allowed at maximum +if (failCount > maxRetries) { +LOG.debug("maxRet
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72291464 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java --- @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Date; + +public class Config implements Serializable { +// kinesis stream name to read from +private final String streamName; +// shard iterator type based on kinesis api - beginning of time, latest, at timestamp are only supported +private final ShardIteratorType shardIteratorType; +// implementation for converting a Kinesis record to a storm tuple +private final RecordToTupleMapper recordToTupleMapper; +// timestamp to be used for shardIteratorType AT_TIMESTAMP - can be null +private final Date timestamp; +// implementation for handling the failed messages retry logic +private final FailedMessageRetryHandler failedMessageRetryHandler; +// object capturing all zk related information for storing committed sequence numbers +private final ZkInfo zkInfo; +// object representing information on paramaters to use while connecting to kinesis using kinesis client +private final KinesisConnectionInfo kinesisConnectionInfo; +// this number represents the number of messages that are still not committed to zk. it will prevent the spout from emitting further. +// for e.g. if 1 failed and 2,3,4,5. all have been acked by storm, they still cant be committed to zk because 1 is still in failed set. As a result +// the acked queue can infinitely grow without any of them being committed to zk. topology max pending does not help since from storm's view they are acked +private final Long maxUncommittedRecords; + +public Config (String streamName, ShardIteratorType shardIteratorType, RecordToTupleMapper recordToTupleMapper, Date timestamp, FailedMessageRetryHandler +failedMessageRetryHandler, ZkInfo zkInfo, KinesisConnectionInfo kinesisConnectionInfo, Long maxUncommittedRecords) { +this.streamName = streamName; +this.shardIteratorType = shardIteratorType; +this.recordToTupleMapper = recordToTupleMapper; +this.timestamp = timestamp; +this.failedMessageRetryHandler = failedMessageRetryHandler; +this.zkInfo = zkInfo; +this.kinesisConnectionInfo = kinesisConnectionInfo; +this.maxUncommittedRecords = maxUncommittedRecords; +validate(); +} + +private void validate () { +if (streamName == null || streamName.length() < 1) { +throw new IllegalArgumentException("streamName is required and cannot be of length 0."); +} +if (shardIteratorType == null || shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || shardIteratorType.equals(ShardIteratorType +.AT_SEQUENCE_NUMBER)) { +throw new IllegalArgumentException("shardIteratorType has to be one of the " + ShardIteratorType.AT_TIMESTAMP + "," + ShardIteratorType.LATEST + +"," + ShardIteratorType.TRIM_HORIZON); +} +if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && timestamp == null) { +throw new IllegalArgumentException("timestamp must be provided if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP); +} +if (recordToTupleMapper == null) { +throw new IllegalArgumentException("recordToTupleMapper cannot be null"); +} +if (failedMessageRetryHandler == null) {
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1586#discussion_r72291240 --- Diff: external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java --- @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kinesis.spout; + +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Date; + +public class Config implements Serializable { --- End diff -- Will change it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/1586 STORM-1839: Storm spout implementation for Amazon Kinesis Streams. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1839 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1586.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1586 commit de68c267fcb7555c7729c9377d3f6d1e504ec25e Author: Priyank <ps...@hortonworks.com> Date: 2016-07-12T19:17:54Z STORM-1839: Storm spout implementation for Amazon Kinesis Streams. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1576: Kafka Spout New Consumer API - KafkaSpoutRetryExponential...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1576 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1576: Kafka Spout New Consumer API - KafkaSpoutRetryExponential...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1576 @hmcl One minor thing. Should we handle long overflows? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1505: STORM-1136: Command line module to return kafka sp...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/1505 STORM-1136: Command line module to return kafka spout offsets lag and⦠⦠display in storm ui. STORM-1136: Missed file in previous commit You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1136-master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1505.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1505 commit 482d96ca9d6a3cb681ea0e393ffade2ebd3d93a2 Author: Priyank <ps...@hortonworks.com> Date: 2016-05-18T19:14:50Z STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui. STORM-1136: Missed file in previous commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1451 @abellina I have added a comment for that div. As far as renaming it to topology-spout-stats is concerned I prefer the current name since right below that div there is a div showing stats for the spout like complete latency, etc. I dont want the two to be confused. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1451#discussion_r66499861 --- Diff: storm-core/src/ui/public/topology.html --- @@ -384,8 +389,51 @@ Topology resources $('#topology-configuration [data-toggle="tooltip"]').tooltip(); $('#topology-actions [data-toggle="tooltip"]').tooltip(); $('#topology-visualization [data-toggle="tooltip"]').tooltip(); + +var lagUrl = "/api/v1/topology/"+topologyId+"/lag"; +$.getJSON(lagUrl,function(lagResponse,status,jqXHR) { + if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) { +var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html(); +var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html(); + +var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";}); +var isJson = function (input) { + try { +JSON.parse(input); + } catch (e) { +return false; + } + return true; +}; +var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);}); +var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);}); +var data = {}; +if (kafkaSpoutsValidResults.length > 0) { + data.kafkaSpoutsLagResults = []; + kafkaSpoutsValidResults.forEach(function(ele) { +var spoutLagResult = JSON.parse(ele.spoutLagResult); +spoutLagResult.forEach(function(ele2) { --- End diff -- @abellina The spoutLagResult property is actually an array. Hence the flattening. Let me elaborate the limitations of valid and invalid approach. The TopologySpoutLag class in storm-core called by ui server does not know anything about if the spoutLagResult is valid or not. Reason is, the way to get lag information for kafka and other spouts is handled by making a shell call handled by an external module since we did not want any direct dependency on storm core. For now we have kafka, but we can add other types of spouts as well. Plus the fields in ui or the template for a different type of spout could be different. The only commonality I found was if something went wrong for getting lag info for underlying spout(kafka or other) then an error message is sent in response. This will work for all types of spouts. To do what you are saying we will have to inspect the response in TopologySpoutLag class. I felt it did not matter if it were ui doing that or server. I preferred ui. Let me know if it makes sense or not and if you still think we need to change somethin g. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1451 @harshach Rebased 1.x branch since there were some merges since the PR was raised. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1451 @abellina The tables are optional already. If there are no kafka spouts then the response will be an empty array. Can you try running a non kafka spout topology and check? I did. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1451#discussion_r66382723 --- Diff: storm-core/src/ui/public/topology.html --- @@ -384,8 +389,51 @@ Topology resources $('#topology-configuration [data-toggle="tooltip"]').tooltip(); $('#topology-actions [data-toggle="tooltip"]').tooltip(); $('#topology-visualization [data-toggle="tooltip"]').tooltip(); + +var lagUrl = "/api/v1/topology/"+topologyId+"/lag"; +$.getJSON(lagUrl,function(lagResponse,status,jqXHR) { + if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) { +var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html(); +var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html(); + +var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";}); +var isJson = function (input) { + try { +JSON.parse(input); + } catch (e) { +return false; + } + return true; +}; +var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);}); +var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);}); +var data = {}; +if (kafkaSpoutsValidResults.length > 0) { + data.kafkaSpoutsLagResults = []; + kafkaSpoutsValidResults.forEach(function(ele) { +var spoutLagResult = JSON.parse(ele.spoutLagResult); +spoutLagResult.forEach(function(ele2) { --- End diff -- We would still need the for loop as need to flatten the results out. I chose to use id in the json passed to template since the template uses id in other tables for ui. If you are fine we can leave it as is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1451#discussion_r66382357 --- Diff: storm-core/src/ui/public/topology.html --- @@ -384,8 +389,51 @@ Topology resources $('#topology-configuration [data-toggle="tooltip"]').tooltip(); $('#topology-actions [data-toggle="tooltip"]').tooltip(); $('#topology-visualization [data-toggle="tooltip"]').tooltip(); + +var lagUrl = "/api/v1/topology/"+topologyId+"/lag"; +$.getJSON(lagUrl,function(lagResponse,status,jqXHR) { + if (lagResponse !== null && lagResponse !== undefined && lagResponse instanceof Array && lagResponse.length > 0) { +var kafkaSpoutsLagTemplate = $(template).filter("#topology-kafka-spouts-lag-template").html(); +var spoutsErrorTemplate = $(template).filter("#topology-spouts-lag-error-template").html(); + +var kafkaSpoutLags = lagResponse.filter(function(ele) {return ele.spoutType === "KAFKA";}); +var isJson = function (input) { + try { +JSON.parse(input); + } catch (e) { +return false; + } + return true; +}; +var kafkaSpoutsValidResults = kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);}); +var kafkaSpoutsErrorResults = kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);}); --- End diff -- @abellina If its an error we only want to show an appropriate message to the user as a string. Also, not that the top level response is still a json. I think string is fine here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...
Github user priyank5485 commented on the issue: https://github.com/apache/storm/pull/1451 opened https://issues.apache.org/jira/browse/STORM-1891to track trident kafka spouts --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1451#discussion_r65582081 --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java --- @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.monitor; + +import kafka.api.OffsetRequest; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.TopicAndPartition; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.consumer.SimpleConsumer; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.json.simple.JSONValue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Utility class for querying offset lag for kafka spout + */ +public class KafkaOffsetLagUtil { +private static final String OPTION_TOPIC_SHORT = "t"; +private static final String OPTION_TOPIC_LONG = "topics"; +private static final String OPTION_OLD_CONSUMER_SHORT = "o"; +private static final String OPTION_OLD_CONSUMER_LONG = "old-spout"; +private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b"; +private static final String OPTION_BOOTSTRAP_BROKERS_LONG = "bootstrap-brokers"; +private static final String OPTION_GROUP_ID_SHORT = "g"; +private static final String OPTION_GROUP_ID_LONG = "groupid"; +private static final String OPTION_TOPIC_WILDCARD_SHORT = "w"; +private static final String OPTION_TOPIC_WILDCARD_LONG = "wildcard-topic"; +private static final String OPTION_PARTITIONS_SHORT = "p"; +private static final String OPTION_PARTITIONS_LONG = "partitions"; +private static final String OPTION_LEADERS_SHORT = "l"; +private static final String OPTION_LEADERS_LONG = "leaders"; +private static final String OPTION_ZK_SERVERS_SHORT = "z"; +private static final String OPTION_ZK_SERVERS_LONG = "zk-servers"; +private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n"; +private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node"; +private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r"; +private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node"; + +public static void main (String args[]) { +try { +List results; +Options options = buildOptions(); +CommandLineParser parser = new DefaultParser(); +CommandLine commandLine = parser.parse(options, args); +if (!commandLine.hasOption(OPTION_TOPIC_LONG)) { +printUsageAndExit(options, OPTION_TOPIC_LONG + " is required"); +} +if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) { +OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery; +if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) { +printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_
[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1451#discussion_r65581276 --- Diff: external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java --- @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.monitor; + +import kafka.api.OffsetRequest; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.TopicAndPartition; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.consumer.SimpleConsumer; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.json.simple.JSONValue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Utility class for querying offset lag for kafka spout + */ +public class KafkaOffsetLagUtil { +private static final String OPTION_TOPIC_SHORT = "t"; --- End diff -- I think its just convenience. You can use -t or --topics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/1451 @harshach Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/1451 @harshach Will do that. Just to confirm, you mean the section header in UI under Topology Stats, right? Also, since the topology can technically have more than one kafka spout and we are showing lags for all spouts there I was thinking of keeping it as plural. May be Kafka Spouts Lag Does that sound okay? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/1451 @HeartSaVioR I just attached the two screen shots. One when it successfully managed to get the lag result. Other one by stopping kafka in which case it will just show a message in the UI indicating the cause. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/1451 ![kafkaspoutlags_success](https://cloud.githubusercontent.com/assets/5192436/15682583/402a1180-2713-11e6-9ad2-e101cbb1ea4c.png) ![kafkaspoutlags_failure](https://cloud.githubusercontent.com/assets/5192436/15682586/43807626-2713-11e6-8609-870774dbc511.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1136: Command line module to return kafk...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/1451 STORM-1136: Command line module to return kafka spout offsets lag and⦠⦠display in storm ui. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1136 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1451.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1451 commit 83b70af3d2a419e81ea9dc8a962dfccbda2a3255 Author: Priyank <ps...@hortonworks.com> Date: 2016-05-18T19:14:50Z STORM-1136: Command line module to return kafka spout offsets lag and display in storm ui. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1535: Make sure hdfs key tab login happe...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/1351#issuecomment-213021857 Thanks for catching that @satishd Removed the comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1535: Make sure hdfs key tab login happe...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/1351 STORM-1535: Make sure hdfs key tab login happens only once for multip⦠â¦le bolts/executors. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1535 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1351.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1351 commit bc488c23420215db847fd8dde5a1006e319521de Author: Priyank <ps...@hortonworks.com> Date: 2016-03-31T19:46:25Z STORM-1535: Make sure hdfs key tab login happens only once for multiple bolts/executors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1706: Add RELEASE and storm-env.sh to st...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/1334 STORM-1706: Add RELEASE and storm-env.sh to storm-diet assembly You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1706 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1334.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1334 commit 5e62b7b36283c1e257395c59237e699067298bd1 Author: Priyank <ps...@hortonworks.com> Date: 2016-04-12T18:46:21Z STORM-1706: Add RELEASE and storm-env.sh to storm-diet assembly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1129: Update ui to use topology name
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/1277#issuecomment-203751947 @abhishekagarwal87 Please look at the $.getJson method. Line 190 in component.html. Now, instead of topologyId being assigned on onReady i do it in $.getJson. Its a side effect of squashing commits that makes it hard to look at the commits that addressed review comments. Sorry about that. Do you guys usually keep separate commits and squash in the end? I will do that next time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1668: Fix silent failing of flux for set...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/1282 STORM-1668: Fix silent failing of flux for setting a non-existent pro⦠â¦perty. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1668-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1282 commit be09c3c1fd05ec1ed8f8d7768c4023691946d0e2 Author: Priyank <ps...@hortonworks.com> Date: 2016-03-30T23:23:05Z STORM-1668: Fix silent failing of flux for setting a non-existent property. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1668: Fix silent failing of flux for set...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/1281 STORM-1668: Fix silent failing of flux for setting a non-existent pro⦠â¦perty. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1668 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1281.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1281 commit 436d7752d9e8deca5b4d83f502eb98491548896f Author: Priyank <ps...@hortonworks.com> Date: 2016-03-30T23:23:05Z STORM-1668: Fix silent failing of flux for setting a non-existent property. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1129: Update ui to use topology name
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/1277#discussion_r57950485 --- Diff: storm-core/src/clj/org/apache/storm/ui/core.clj --- @@ -466,6 +466,21 @@ "assignedCpu" (.get_assigned_cpu t)}) "schedulerDisplayResource" (*STORM-CONF* Config/SCHEDULER_DISPLAY_RESOURCE)})) +(defn get-topology-id [topology-name-or-id] + (let [summary ((all-topologies-summary) "topologies") +filter-fun-name (fn[topo-summary] (= (topo-summary "name") topology-name-or-id)) +filter-fun-id (fn[topo-summary] (= (topo-summary "id") topology-name-or-id)) +matching-topologies-by-name (filter filter-fun-name summary) +matching-topologies-by-id (filter filter-fun-id summary) +matching-topologies +(cond + (not-empty matching-topologies-by-name) matching-topologies-by-name + (not-empty matching-topologies-by-id) matching-topologies-by-id + :else nil) +_ (when (empty? matching-topologies) (throw (NotAliveException. (str topology-name-or-id " is not alive" +id ((first matching-topologies) "id")] +id)) --- End diff -- @knusbaum Thats true. But if we prioritize ids would not your topology become inaccessible? I think going forward we should use topology names in ui and elsewhere so that we can have permanent links and since thats the identifier given by the user. Support for lookup with ids was added for backwards compatibility as suggested by @revans2 and others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1129: Update ui to use topology name
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/1277#issuecomment-203557013 @abhishekagarwal87 Thanks for the catch. I have updated the PR. Can you please review? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1129: Use topology name instead of id in...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/854#issuecomment-203246876 @harshach @revans2 I have addressed the comments and raised a PR https://github.com/apache/storm/pull/1277 against the 1.x branch as I could not test my branch off of master due to another issue. Can you please review the PR against 1.x branch? I will port the same to master by updating this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1129: Update ui to use topology name
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/1277 STORM-1129: Update ui to use topology name You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1129-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1277.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1277 commit 2648d74cad7d1e46ec9bcceb9b3b388cb98cfc8d Author: Priyank <ps...@hortonworks.com> Date: 2016-03-29T05:31:08Z STORM-1129: Update ui to use topology name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1654 HBaseBolt creates tick tuples with ...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/1251#issuecomment-200919442 +1 (NB). @HeartSaVioR I had created https://issues.apache.org/jira/browse/STORM-1626 Will close that one since its a duplicate of STORM-1654. Please let me know if its not true. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1449] Fix Kafka spout to maintain backw...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/994#issuecomment-173024682 @revans2 I am fine with the changes except one major concern pointed out by @Parth-Brahmbhatt I confirmed this with him and our main concern would be the rolling upgrade feature for topologies with kafka spout. That would not work anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1449] Fix Kafka spout to maintain backw...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/994#issuecomment-171576791 Just ran in to an issue where this fix does not work. Correct me if i am wrong. But a fat storm jar that is compiled with older version of storm-kafka will still use byte[] and the newer version of storm now uses ByteBuffer. Hence it throws an exception at runtime if storm is upgraded. I think upgrade of storm should still work with older topology jars. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1458: Add check to see if nimbus is alre...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/998 STORM-1458: Add check to see if nimbus is already running. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1458 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/998.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #998 commit d62281629afe02a34aba10633c36be546eb70a74 Author: Priyank <ps...@hortonworks.com> Date: 2016-01-08T21:27:03Z STORM-1458: Add check to see if nimbus is already running. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1160: Add hadoop-auth dependency needed ...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/1000 STORM-1160: Add hadoop-auth dependency needed for storm-core hadoop-auth is needed for storm ui in a secured cluster for authentication. Adding the dependency back since it was removed in one of the commits. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1160 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1000.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1000 commit f939fc27fbcc8dd1fc1e3b9d8948f1994fc399af Author: Priyank <ps...@hortonworks.com> Date: 2016-01-09T01:15:51Z STORM-1160: Add hadoop-auth dependency needed for storm-core hadoop-auth is needed for storm ui in a secured cluster for authentication. Adding the dependency back since it was removed in one of the commits. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-904: Move bin/storm command line to java...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/662#issuecomment-157804563 @hustfxj I might have missed something and its been a long time since i did this. I could not find any storm command that would not need a waitFor. Can you give an example? If we do need that flag it should be easy to add and I can change that. @longdafeng I am not sure I understand the point about debugging. You can still put a break point in this java version of storm command line. I agree with you that its easier to write new functions in script. But the whole point of this JIRA is to have one client that works for Unix based systems and Windows platform. The last time I was working on it we had storm.py for Unix and storm batch client for windows. Both of them also had a separate file for environment variables. It is hard to maintain two scripts anytime we want to change the storm command. We can ask others about their opinion and what they think is better between the two approaches. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1129: Use topology name instead of id in...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/854#issuecomment-154136188 @revans2 @Parth-Brahmbhatt @jerrypeng @harshach Thanks all for the feedback. Other than the changes here there are some commits to apache/master after creation of this PR which added some more apis in UI using topology id making this PR unmergeable. I will sort all of that out and update the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1129: Use topology name instead of id in...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/854#issuecomment-153819160 @jerrypeng We have a check and it throws the following exception. Exception in thread "main" java.lang.RuntimeException: Topology with name `wordcount` already exists on cluster at backtype.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:231) at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:275) at backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:311) at backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:292) at storm.starter.WordCountTopology.main(WordCountTopology.java:94) @revans2 Thanks for chiming in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1129: Use topology name instead of id in...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/854 STORM-1129: Use topology name instead of id in UI calls. Note that all-topologies-summary has been used to get topology id from a topology name. That involves a few calls to zookeeper which is not ideal. However, UI does not seem to take any significant performance hit. If needed we can handle it possibly using one of the options below. Since its a separate performance issue we can handle it in a separate JIRA. 1. Have nimbus thrift server cache summary for topologies so it does not hit zookeeper every time we try to get topology id from name. 2. Update nimbus thrift api with a method that takes options and use that to do only the minimal necessary interaction with zookeeper for a given option. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1129 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/854.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #854 commit 77b73af52bdc2d691ae172a1a2905a94395c3c70 Author: Priyank <ps...@hortonworks.com> Date: 2015-11-04T00:20:50Z STORM-1129: Use topology name instead of id in UI calls. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-997: Add proxy user functionality for st...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/692#issuecomment-145642316 @revans2 I think what you mentioned above would not happen. I mean the part where you say that storm running in unsecured mode and hdfs in secured mode will disable security. Please correct me if i am wrong. I am new to this. But the way i imagine it will work is that the proxy user functionality code(which is doAs part) will be executed only after the user is logged in to HDFS. In secured mode user will already be validated using keytab. Above that hdfs needs config entries in core-site.xml which is needed for the proxy user functionality to work. Please look at the configurations section at http://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-common/Superusers.html hdfs in that case would make sure that if the logged in user is not authorized to impersonate as some other user based on that config it will throw an exception. Again, there is a good chance i might be missing something here. Please correct me if i am wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-997: Add proxy user functionality for st...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/692#issuecomment-143101710 @Parth-Brahmbhatt @ptgoetz Can you please review this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1017: Ignore ignoreZkOffsets in case of ...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/754#issuecomment-142448505 @harshach Created JIRA item for that. @ptgoetz can you please review this as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Storm-1017: Ignore ignoreZkOffsets in case of ...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/754 Storm-1017: Ignore ignoreZkOffsets in case of worker crash and restart. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1017 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/754.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #754 commit a89b7ff4b7c96f7998e3a0c673937f9ddefd59c9 Author: Priyank <ps...@hortonworks.com> Date: 2015-09-15T06:27:01Z STORM-1017: Ignore ignoreZkOffsets in case of worker crash and restart. commit 56cfdbf8d9b3b6044d7aa05dfb6b501c24b2f277 Author: Priyank <ps...@hortonworks.com> Date: 2015-09-22T16:14:26Z STORM-1017: Ignore ignoreZkOffsets for trident. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1039: Remove commons-codec shading, comm...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/730 STORM-1039: Remove commons-codec shading, commons-codec is used by ha⦠â¦doop authentication library which is used during ui authentication in secured environment. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-1039 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/730.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #730 commit 5b50bb9ecdccfdc9abb189553f3fe8dc7650be57 Author: Priyank <ps...@hortonworks.com> Date: 2015-09-10T00:14:00Z STORM-1039: Remove commons-codec shading, commons-codec is used by hadoop authentication library which is used during ui authentication in secured environment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-997: Add proxy user functionality for st...
Github user priyank5485 commented on the pull request: https://github.com/apache/storm/pull/692#issuecomment-136490586 @harshach thanks for the review. Yes, I will do it. I realized that it needs to be added for trident after creating the pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-997: Add proxy user functionality for st...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/692 STORM-997: Add proxy user functionality for storm hdfs connector. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-997 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/692.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #692 commit 96903303ccd9087fad27cf4bc4081be1332344f6 Author: Priyank ps...@hortonworks.com Date: 2015-08-20T07:21:16Z STORM-997: Add proxy user functionality for storm hdfs connector. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-965: Fix excessive logging
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/686 STORM-965: Fix excessive logging The reason for this pull request is that when a non-secure client tries to connect to nimbus host in a secure cluster, an exception was being thrown and a null transport object was being returned. This null object in turn was being used by thrift library class and threw a null pointer exception. Both the classes where exception was thrown and caught were logging this exception, creating unnecessary lengthy log statements. Disabling logging in storm code by changing it from error to debug took care of one part but to work around the issue of exception being logged in the thrift jar a NoOpTTrasport.java had to be created. Two main things to consider during review 1. Change from Logger.error to Logger.debug in KerberosSaslTransportPlugin.java 2. Early return false in SaslTransportPlugin.java which is presumed and tested with one nimbus client to work okay. Note: TProcess interface process method does not have documentation saying what the returned boolean indicates. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm STORM-965 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/686.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #686 commit f29de472a21a9f5936cb13d26cc5a36669c48164 Author: Priyank ps...@hortonworks.com Date: 2015-08-13T22:42:24Z STORM-965: Fix excessive logging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-904: Move bin/storm command line to java...
Github user priyank5485 commented on a diff in the pull request: https://github.com/apache/storm/pull/662#discussion_r36324914 --- Diff: storm-core/src/jvm/backtype/storm/utils/StormCommandExecutor.java --- @@ -0,0 +1,785 @@ +package backtype.storm.utils; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.util.*; + +import clojure.lang.IFn; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.SystemUtils; + +/** + * Created by pshah on 7/17/15. + */ +abstract class StormCommandExecutor { +final String NIMBUS_CLASS = backtype.storm.daemon.nimbus; +final String SUPERVISOR_CLASS = backtype.storm.daemon.supervisor; +final String UI_CLASS = backtype.storm.ui.core; +final String LOGVIEWER_CLASS = backtype.storm.daemon.logviewer; +final String DRPC_CLASS = backtype.storm.daemon.drpc; +final String REPL_CLASS = clojure.main; +final String ACTIVATE_CLASS = backtype.storm.command.activate; +final String DEACTIVATE_CLASS = backtype.storm.command.deactivate; +final String REBALANCE_CLASS = backtype.storm.command.rebalance; +final String LIST_CLASS = backtype.storm.command.list; +String stormHomeDirectory; +String userConfDirectory; +String stormConfDirectory; +String clusterConfDirectory; +String stormLibDirectory; +String stormBinDirectory; +String stormLog4jConfDirectory; +String configFile = ; +String javaCommand; +ListString configOptions = new ArrayListString(); +String stormExternalClasspath; +String stormExternalClasspathDaemon; +String fileSeparator; +final ListString COMMANDS = Arrays.asList(jar, kill, shell, +nimbus, ui, logviewer, drpc, supervisor, +localconfvalue, remoteconfvalue, repl, classpath, +activate, deactivate, rebalance, help, list, +dev-zookeeper, version, monitor, upload-credentials, +get-errors); + +public static void main (String[] args) { +for (String arg : args) { +System.out.println(Argument ++ is + arg); +} +StormCommandExecutor stormCommandExecutor; +if (System.getProperty(os.name).startsWith(Windows)) { +stormCommandExecutor = new WindowsStormCommandExecutor(); +} else { +stormCommandExecutor = new UnixStormCommandExecutor(); +} +stormCommandExecutor.initialize(); +stormCommandExecutor.execute(args); +} + +StormCommandExecutor () { + +} + +abstract void initialize (); + +abstract void execute (String[] args); + +void callMethod (String command, ListString args) { +Class implementation = this.getClass(); +String methodName = command.replace(-, ) + Command; +try { +Method method = implementation.getDeclaredMethod(methodName, List +.class); +method.invoke(this, args); +} catch (NoSuchMethodException ex) { +System.out.println(No such method exception occured while trying + + to run storm method + command); +} catch (IllegalAccessException ex) { +System.out.println(Illegal access exception occured while trying + + to run storm method + command); +} catch (IllegalArgumentException ex) { +System.out.println(Illegal argument exception occured while + +trying + to run storm method + command); +} catch (InvocationTargetException ex) { +System.out.println(Invocation target exception occured while + +trying + to run storm method + command); +} +} +} + +class UnixStormCommandExecutor extends StormCommandExecutor { + +UnixStormCommandExecutor () { + +} + +void initialize () { +Collections.sort(this.COMMANDS); +this.fileSeparator = System .getProperty (file.separator); +this.stormHomeDirectory = System.getenv(STORM_BASE_DIR); +this.userConfDirectory = System.getProperty(user.home) + +this.fileSeparator + + +.storm; +this.stormConfDirectory = System.getenv(STORM_CONF_DIR); +this.clusterConfDirectory = this.stormConfDirectory == null ? (this +.stormHomeDirectory + this.fileSeparator
[GitHub] storm pull request: STORM-904: Move bin/storm command line to java...
GitHub user priyank5485 opened a pull request: https://github.com/apache/storm/pull/662 STORM-904: Move bin/storm command line to java. You can merge this pull request into a Git repository by running: $ git pull https://github.com/priyank5485/storm storm-904 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/662.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #662 commit 193f59e027a8ab95d72c8e386d66f0b90f9b86f9 Author: Priyank ps...@hortonworks.com Date: 2015-07-17T23:45:13Z Remove python and add a java class to support storm cli. Add a java class that encapsulates storm cli functionality currently coded in two different places. storm, storm.py for bash and storm.cmd for windows. Now both storm and storm.cmd will call the java class eliminating python code and putting all the code in one place for bash and windows respectively. commit 793781cfd55c56f7184910fd744341351f6cc4bf Author: Priyank ps...@hortonworks.com Date: 2015-07-20T23:04:48Z Add apace commons cli library for command line parsing commit a425fc50ab9b518337d0c095ae80e9d78221e3fd Author: Priyank ps...@hortonworks.com Date: 2015-07-21T17:44:30Z Adding version for dependency in pom file. commit 5a4ec36ac17af69297dfc73aaa682b8e43edf5de Author: Priyank ps...@hortonworks.com Date: 2015-07-21T21:23:54Z Sample code to test for apache commons cli. commit 1a8240a558516110a5c14d3a4bb0b177ab76090b Author: Priyank ps...@hortonworks.com Date: 2015-07-22T23:40:40Z Removing apache commons cli, renaming and some intiialization code. commit b69e295e81216e992187737cc069ff66d2e74687 Author: Priyank ps...@hortonworks.com Date: 2015-07-23T10:14:48Z Some redesigning/refactoring. commit 8c2fbeadbf699b5b286479649d133623c92bc85c Author: Priyank ps...@hortonworks.com Date: 2015-07-23T21:49:43Z Add printUsage mainly. commit 74f45c7c9d6ae813635c91ba14fdb1cd3dd24c74 Author: Priyank ps...@hortonworks.com Date: 2015-07-24T18:49:15Z Call storm command methods using reflection. commit 10ba25f8c81e2fdc34f4ae3e0e2cdf9ecd142a4a Author: Priyank ps...@hortonworks.com Date: 2015-07-24T19:25:01Z Add method boxes without code for other storm commands. commit fb2022db8dda7a5183ee5a2cc64a7ef035006c80 Author: Priyank ps...@hortonworks.com Date: 2015-07-27T17:59:20Z Initial executeStorm method creating a new process. Need to test storm daemons if they can be killed using shutdown hooks. commit 493b0dfeb7be2daa0a54e7dec2f3f716d0af0e26 Author: Priyank ps...@hortonworks.com Date: 2015-07-29T07:02:49Z Ship some more code over from python. commit 3082d4f5ea2d66babe232e5ff49506b322385473 Author: Priyank ps...@hortonworks.com Date: 2015-07-29T18:49:57Z Fix an exception which failed to start the nimbus command. commit c616068d90dd47fa61fbb670c2853805d7ccb324 Author: Priyank ps...@hortonworks.com Date: 2015-07-29T21:53:27Z Use exec to replace current process rather than creating a new one. commit 06f4dcc17d8dcf3c81094557ed97ff117d6410ed Author: Priyank ps...@hortonworks.com Date: 2015-07-30T08:37:08Z Add some commented code. commit da0ad60438841a1f874c115cef27190a59401f4f Author: Priyank ps...@hortonworks.com Date: 2015-07-30T15:45:52Z Add nil check for strings returned from conclave commit 0b242adf0bc261e9140c3587b3eedd2141e31718 Author: Priyank ps...@hortonworks.com Date: 2015-07-30T19:04:35Z Fix an issue with passing options to storm command, commit 0bb15782c171d336be905d2f5143865e288a8239 Author: Priyank ps...@hortonworks.com Date: 2015-07-31T00:37:07Z STORM-904: Add shutdown hook thread to kill subprocess. Does not work for kill -9. Need to debug. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---