[GitHub] storm issue #2760: STORM-3123: Storm Kafka Monitor cannot connect to Kafka o...

2018-07-12 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/2760
  
@VipinRathor You should also update 

https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
and

https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java

That will make sure that lags will show up in storm ui as well where spout 
is talking to kafka on SSL listener. Can you also test it?


---


[GitHub] storm pull request #2760: STORM-3123: Storm Kafka Monitor cannot connect to ...

2018-07-12 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2760#discussion_r202213538
  
--- Diff: 
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
 ---
@@ -145,7 +148,8 @@ public static void main(String args[]) {
 NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery =
 new 
NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
  
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG),
- 
commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol);
+ 
commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol,
--- End diff --

I suggest adding a check in the old spout if block and throw an error if 
this option provided with old spout.


---


[GitHub] storm issue #2670: STORM-3068: STORM_JAR_JVM_OPTS are not passed to storm-ka...

2018-05-10 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/2670
  
+1(NB)


---


[GitHub] storm issue #2670: STORM-3068: STORM_JAR_JVM_OPTS are not passed to storm-ka...

2018-05-10 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/2670
  
@rajkrrsingh When you say STORM_JAR_JVM_OPTS is not propagating to the java 
args, where is it not propagating from? I don't see it being set anywhere in 
this script. And i see that its sourcing the storm-env.sh script. So i was just 
suggesting just adding a check like we have for JAVA_HOME to handle case where 
its not defined.


---


[GitHub] storm issue #2670: STORM-3068: STORM_JAR_JVM_OPTS are not passed to storm-ka...

2018-05-10 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/2670
  
@rajkrrsingh Is the presumption that STORM_JAR_JVM_OPTS will be set? Have 
you checked if its okay when the variable is not set? Can you add it to README 
as well regarding this change?


---


[GitHub] storm pull request #2665: STORM-2988 Error on initialization of server mk-wo...

2018-05-07 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2665#discussion_r186564359
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java ---
@@ -67,7 +68,7 @@ public void prepare(MetricRegistry metricsRegistry, 
Map<String, Object> stormCon
 }
 
 public static String getMetricsJMXDomain(Map reporterConf) {
-return Utils.getString(reporterConf, JMX_DOMAIN);
+return 
Utils.getString(reporterConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN),
 null);
--- End diff --

@arunmahadevan I am not saying we should return that as default value. I am 
suggesting to use that as the key to look up the conf map. 


---


[GitHub] storm pull request #2665: STORM-2988 Error on initialization of server mk-wo...

2018-05-07 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2665#discussion_r186560080
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java ---
@@ -67,7 +68,7 @@ public void prepare(MetricRegistry metricsRegistry, 
Map<String, Object> stormCon
 }
 
 public static String getMetricsJMXDomain(Map reporterConf) {
-return Utils.getString(reporterConf, JMX_DOMAIN);
+return 
Utils.getString(reporterConf.get(Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN),
 null);
--- End diff --

JMX_DOMAIN used initially is different than 
Config.STORM_DAEMON_METRICS_REPORTER_PLUGIN_DOMAIN

I think we should stick to JMX_DOMAIN config key unless @ptgoetz  thinks 
this is good.


---


[GitHub] storm issue #1505: STORM-1136: Command line module to return kafka spout off...

2017-11-13 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1505
  
@lichenglongqi You should not have to do anything. The lag should 
automatically show up in storm ui on the topology page after you submit a 
topology that has kafka spout in it. 1.0.4 does not have that feature. Can you 
try 1.1.1 ?


---


[GitHub] storm issue #2150: STORM-2541: Fix storm-kafka-client manual subscription no...

2017-06-25 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/2150
  
@srdo @harshach Some comments related to storm-kafka-monitor 

[~Srdo] Regarding first point, because the lag request is a http pull 
request from UI, as long as 
kafkaSpoutConfig.getSubscription().getTopicsString(); returns the correct value 
it will work since the open method would have been called eventually. The only 
change we would need is that when the rebalance listener is invoked we would 
need to keep track of the topics currently subscribed. For e.g. 
PatternSubscription can have an instance variable called Set topics 
that would be correctly updated anytime onPartitionsRevoked or 
onPartitionsAssigned is called. We can use that instance variable to return the 
value when getTopicsString is called on that object. Does that work?

Regarding point 2, we could move the kafka client version to 
storm-kafka-monitor module. Can you elaborate a bit more on your concern? Is it 
that if kafka cluster is upgraded, storm-kafka-monitor wont work? In that case 
storm-kafka-client module will also have to be updated and topology jar 
rebuilt. Correct? I think in general, we have a certain compatibility 
restriction in that a given storm release works or has been tested with a 
certain version of kafka clients. Correct me if i am wrong.

Regarding 3, the main reason for creating a separate module and calling 
bash script from storm UI is so that storm-core does not have a direct kafka 
dependency since that made sense. For windows, cygwin can be a workaround. 
Plus, i dont know how many windows users we have. We can start a thread to see 
if there is really a user base for which we need to support a use case.

I dont know the details about how metrics would work. We could have 
opinions from other people. If that is indeed the right way to go about it then 
I am all for changing it.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

2017-06-21 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/2155
  
+1(NB) given that people prefer keeping the Builder class around.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2168: STORM-2562: Use stronger key size than default for...

2017-06-20 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/2168

STORM-2562: Use stronger key size than default for blow fish key gene…

…rator and get rid of stack trace

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-2562-1.x

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2168.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2168


commit cee60478cd557cb96dab63fbb9e5f8d262e91487
Author: Priyank <ps...@hortonworks.com>
Date:   2017-06-20T19:01:13Z

STORM-2562: Use stronger key size than default for blow fish key generator 
and get rid of stack trace




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2167: STORM-2562: Use stronger key size than default for...

2017-06-20 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/2167

STORM-2562: Use stronger key size than default for blow fish key gene…

…rator and get rid of stack trace

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-2562

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2167.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2167


commit 3baf61bad84001cd3ed21d30dc56b3df60e75e20
Author: Priyank <ps...@hortonworks.com>
Date:   2017-06-20T19:01:13Z

STORM-2562: Use stronger key size than default for blow fish key generator 
and get rid of stack trace




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid duplicatin...

2017-06-12 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/2155
  
@srdo Overall, this looks good. But I also think we should get rid of the 
Builder class totally. Reason is that it does not work well with flux yaml 
since you create the object using builder.build(). Let me know your thoughts on 
that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2155: STORM-2548: Simplify KafkaSpoutConfig to avoid dup...

2017-06-12 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2155#discussion_r121530308
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 ---
@@ -79,16 +78,46 @@
 UNCOMMITTED_EARLIEST,
 UNCOMMITTED_LATEST }
 
+/**
+ * Convenience method to get a Builder for String key/value spouts. 
Sets the key/value Deserializers to the StringDeserializer class.
+ * Users who need different deserializers should use the Builder 
constructors instead, 
+ * and set the deserializer classes via {@link 
Builder#setProp(java.lang.String, java.lang.Object)}
+ * @param bootstrapServers The bootstrap servers for the consumer
+ * @param topics The topics to subscribe to
+ * @return The new builder
+ */
 public static Builder<String, String> builder(String bootstrapServers, 
String ... topics) {
-return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+return setDefaultStringDeserializers(new 
Builder<>(bootstrapServers, topics));
 }
 
+/**
+ * Convenience method to get a Builder for String key/value spouts. 
Sets the key/value Deserializers to the StringDeserializer class.
+ * Users who need different deserializers should use the Builder 
constructors instead, 
+ * and set the deserializer classes via {@link 
Builder#setProp(java.lang.String, java.lang.Object)}
+ * @param bootstrapServers The bootstrap servers for the consumer
+ * @param topics The topics to subscribe to
+ * @return The new builder
+ */
 public static Builder<String, String> builder(String bootstrapServers, 
Collection topics) {
-return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+return setDefaultStringDeserializers(new 
Builder<>(bootstrapServers, topics));
 }
 
+/**
+ * Convenience method to get a Builder for String key/value spouts. 
Sets the key/value Deserializers to the StringDeserializer class.
+ * Users who need different deserializers should use the Builder 
constructors instead, 
+ * and set the deserializer classes via {@link 
Builder#setProp(java.lang.String, java.lang.Object)}
+ * @param bootstrapServers The bootstrap servers for the consumer
+ * @param topics The topic pattern to subscribe to
+ * @return The new builder
+ */
 public static Builder<String, String> builder(String bootstrapServers, 
Pattern topics) {
-return new Builder<>(bootstrapServers, StringDeserializer.class, 
StringDeserializer.class, topics);
+return setDefaultStringDeserializers(new 
Builder<>(bootstrapServers, topics));
+}
+
+private static Builder<String, String> 
setDefaultStringDeserializers(Builder<String, String> builder) {
+builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
--- End diff --

I think ByteArrayDeserializer makes more sense. Not a strong preference 
though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2126: Add the option to set client.id to storm-kafka and storm-...

2017-05-19 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/2126
  
@carl34 Good point. I did not realize it was final until you brought it up. 
In that case i am okay with your change for storm-kafka module. For 
storm-kafka-client module i still prefer avoiding the method setCliendId. I 
would rather use setProp instead. Can you use setProp instead on the builder 
for now? And when i create a PR for refactoring storm-kafka-client you can 
comment about any issues that you might run in to because of the changes in the 
PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2126: Add the option to set client.id to storm-kafka and storm-...

2017-05-19 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/2126
  
@carl34 
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java#L32
 is already a public property. Are you saying passing it as a constructor 
argument is more clear than setting a public property? If yes, I dont agree 
with you on that. That extra constructor does not add any value. Plus, 
community is encouraged to use new spout. So unless there is a blocker or 
critical bug in the old spout i prefer not changing any code there.

For storm-kafka-client I plan to raise a PR to get rid of the builder 
pattern. Builder pattern relies on builder.build() and pass the returned object 
to the constructor. Unfortunately, that does not work well with flux. Hence 
setProp method, either on the builder (or may be in future directly on 
KafkaSpoutConfig) is encouraged. There are so many other properties on kafka 
consumer api. It does not make sense to expose each one of those as a setXXX 
java method. Without those setXXX methods it is a much cleaner api for spout 
and its config object. Intellisense should not be the top priority here. I 
would prefer a cleaner api. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2126: Add the option to set client.id to storm-kafka and storm-...

2017-05-19 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/2126
  
@carl34  We already have setProp method in KafkaSpoutConfig in 
storm-kafka-client that can be used to set any producer property. I dont think 
this adds any value. Can you elaborate on why you need that method? As far as 
the storm-kafka module is concerned that property is already public. I dont see 
a reason for adding one more constructor argument. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2116: STORM-2512: Make constructor public and add one mo...

2017-05-13 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/2116

STORM-2512: Make constructor public and add one more builder constructor



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-2512

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2116.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2116


commit 9c616fd5ee887e2cf17582b4e80adc3b82158c37
Author: Priyank <ps...@hortonworks.com>
Date:   2017-05-13T06:32:08Z

STORM-2512: Make constructor public and add one more builder constructor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2115: STORM-2512: Make constructor public and add one mo...

2017-05-13 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/2115

STORM-2512: Make constructor public and add one more builder constructor



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-2512-1.x

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2115.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2115


commit 5ff7865cf0b86f40e99b54e789fa60b8843191aa
Author: Priyank <ps...@hortonworks.com>
Date:   2017-05-13T06:32:08Z

STORM-2512: Make constructor public and add one more builder constructor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2081: [STORM-2482] Refactor the Storm auto credential plugins t...

2017-04-26 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/2081
  
+1(non-binding)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2081: [STORM-2482] Refactor the Storm auto credential pl...

2017-04-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2081#discussion_r113541511
  
--- Diff: 
external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
 ---
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.common;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.storm.security.INimbusCredentialPlugin;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.security.auth.ICredentialsRenewer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The base class that for auto credential plugins that abstracts out some 
of the common functionality.
+ */
+public abstract class AbstractAutoCreds implements IAutoCredentials, 
ICredentialsRenewer, INimbusCredentialPlugin {
+private static final Logger LOG = 
LoggerFactory.getLogger(AbstractAutoCreds.class);
+public static final String CONFIG_KEY_RESOURCES = "resources";
+
+private List configKeys = new ArrayList<>();
+private Map<String, Map<String, Object>> configMap = new HashMap<>();
+
+@Override
+public void prepare(Map conf) {
+doPrepare(conf);
+String configKeyString = getConfigKeyString();
+if (conf.containsKey(configKeyString)) {
+configKeys.addAll((List) conf.get(configKeyString));
+for (String key : configKeys) {
+if (conf.containsKey(key)) {
+Map<String, Object> config = (Map<String, Object>) 
conf.get(key);
+configMap.put(key, config);
+LOG.info("configKey = {}, config = {}", key, config);
+}
+}
+}
+}
+
+@Override
+public void populateCredentials(Map<String, String> credentials, Map 
conf) {
+try {
+if (configKeys != null) {
+Map<String, Object> updatedConf = updateConfigs(conf);
+for (String configKey : configKeys) {
+credentials.put(getCredentialKey(configKey),
+
DatatypeConverter.printBase64Binary(getHadoopCredentials(updatedConf, 
configKey)));
+}
+} else {
+credentials.put(getCredentialKey(StringUtils.EMPTY),
+
DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+}
+LOG.info("Tokens added to credentials map.");
+} catch (Exception e) {
+LOG.error("Could not populate credentials.", e);
+}
+}
+
+private Map<String, Object> updateConfigs(Map topologyConf) {
+Map<String, Object> res = new HashMap<>(topologyConf);
+if (configKeys != null) {
+for (String configKey : configKeys) {
+if (!res.containsKey(configKey) && 
configMap.containsKey(configKey)) {
+res.put(configKey, configMap.

[GitHub] storm pull request #2081: [STORM-2482] Refactor the Storm auto credential pl...

2017-04-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2081#discussion_r113535108
  
--- Diff: 
external/storm-autocreds/src/main/java/org/apache/storm/common/AbstractAutoCreds.java
 ---
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.common;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.storm.security.INimbusCredentialPlugin;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.security.auth.ICredentialsRenewer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The base class that for auto credential plugins that abstracts out some 
of the common functionality.
+ */
+public abstract class AbstractAutoCreds implements IAutoCredentials, 
ICredentialsRenewer, INimbusCredentialPlugin {
+private static final Logger LOG = 
LoggerFactory.getLogger(AbstractAutoCreds.class);
+public static final String CONFIG_KEY_RESOURCES = "resources";
+
+private List configKeys = new ArrayList<>();
+private Map<String, Map<String, Object>> configMap = new HashMap<>();
+
+@Override
+public void prepare(Map conf) {
+doPrepare(conf);
+String configKeyString = getConfigKeyString();
+if (conf.containsKey(configKeyString)) {
+configKeys.addAll((List) conf.get(configKeyString));
+for (String key : configKeys) {
+if (conf.containsKey(key)) {
+Map<String, Object> config = (Map<String, Object>) 
conf.get(key);
+configMap.put(key, config);
+LOG.info("configKey = {}, config = {}", key, config);
+}
+}
+}
+}
+
+@Override
+public void populateCredentials(Map<String, String> credentials, Map 
conf) {
+try {
+if (configKeys != null) {
+Map<String, Object> updatedConf = updateConfigs(conf);
+for (String configKey : configKeys) {
+credentials.put(getCredentialKey(configKey),
+
DatatypeConverter.printBase64Binary(getHadoopCredentials(updatedConf, 
configKey)));
+}
+} else {
+credentials.put(getCredentialKey(StringUtils.EMPTY),
+
DatatypeConverter.printBase64Binary(getHadoopCredentials(conf)));
+}
+LOG.info("Tokens added to credentials map.");
+} catch (Exception e) {
+LOG.error("Could not populate credentials.", e);
+}
+}
+
+private Map<String, Object> updateConfigs(Map topologyConf) {
+Map<String, Object> res = new HashMap<>(topologyConf);
+if (configKeys != null) {
+for (String configKey : configKeys) {
+if (!res.containsKey(configKey) && 
configMap.containsKey(configKey)) {
+res.put(configKey, configMap.

[GitHub] storm pull request #2081: [STORM-2482] Refactor the Storm auto credential pl...

2017-04-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2081#discussion_r113517229
  
--- Diff: docs/storm-hbase.md ---
@@ -56,22 +56,43 @@ The approach described above requires that all 
potential worker hosts have "stor
 multiple topologies on a cluster , each with different hbase user, you 
will have to create multiple keytabs and distribute
 it to all workers. Instead of doing that you could use the following 
approach:
 
-Your administrator can configure nimbus to automatically get delegation 
tokens on behalf of the topology submitter user.
-The nimbus need to start with following configurations:
+Your administrator can configure nimbus to automatically get delegation 
tokens on behalf of the topology submitter user. The nimbus should be started 
with following configurations:
 
+```
 nimbus.autocredential.plugins.classes : 
["org.apache.storm.hbase.security.AutoHBase"] 
 nimbus.credential.renewers.classes : 
["org.apache.storm.hbase.security.AutoHBase"] 
 hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of 
hbase super user that can impersonate other users.)
 hbase.kerberos.principal: "superu...@example.com"
-nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by 
default expire every 7 days and can not be renewed, 
-if you have custom settings for hbase.auth.token.max.lifetime in 
hbase-site.xml than you should ensure this value is 
-atleast 1 hour less then that.)
+nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by 
default expire every 7 days and can not be renewed,  if you have custom 
settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should 
ensure this value is atleast 1 hour less then that.)
+```
 
 Your topology configuration should have:
-topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 
+
+```
+topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"]
+```
 
 If nimbus did not have the above configuration you need to add it and then 
restart it. Ensure the hbase configuration 
-files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase 
jar with all the dependencies is present in nimbus's classpath. 
+files(core-site.xml, hdfs-site.xml and hbase-site.xml) and the storm-hbase 
jar with all the dependencies is present in nimbus's classpath.
+
+As an alternative to adding the configuration files (core-site.xml, 
hdfs-site.xml and hbase-site.xml) to the classpath, you could specify the 
configurations as a part of the topology configuration. E.g. in you custom 
storm.yaml (or -c option while submitting the topology),
+
+```
+hbaseCredentialsConfigKeys : ["cluster1", "cluster2"] (the hbase clusters 
you want to fetch the tokens from)
+cluster1: [{"config1": "value1", "config2": "value2", ... }] (A map of 
config key-values specific to cluster1)
+cluster2: [{"config1": "value1", "hbase.keytab.file": 
"/path/to/keytab/for/cluster2/on/nimubs", "hbase.kerberos.principal": 
"cluster2u...@example.com"}] (here along with other configs, we have custom 
keytab and principal for "cluster2" which will override the keytab/principal 
specified at topology level)
+```
+
+Instead of specifying key values you may also directly specify the 
resource files for e.g.,
+
+```
+cluster1: [{"resources": ["/path/to/core-site1.xml", 
"/path/to/hbase-site1.xml"]}]
+cluster2: [{"resources": ["/path/to/core-site2.xml", 
"/path/to/hbase-site2.xml"]}]
+```
+
+Storm will download the tokens separately for each of the clusters and 
populate it into the subject and also renew the tokens periodically. 
+This way it would be possible to run multiple bolts connecting to separate 
HBase cluster within the same topology.
+
--- End diff --

Just a few lines below it says "As nimbus is impersonating. and it 
mentions storm.kerberos.principal. Is that right or should we change it to 
hbase.kerberos.principal ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2081: [STORM-2482] Refactor the Storm auto credential pl...

2017-04-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2081#discussion_r113515141
  
--- Diff: docs/SECURITY.md ---
@@ -423,16 +423,18 @@ nimbus.impersonation.acl:
 
 ### Automatic Credentials Push and Renewal
 Individual topologies have the ability to push credentials (tickets and 
tokens) to workers so that they can access secure services.  Exposing this to 
all of the users can be a pain for them.
-To hide this from them in the common case plugins can be used to populate 
the credentials, unpack them on the other side into a java Subject, and also 
allow Nimbus to renew the credentials if needed.
-These are controlled by the following configs. topology.auto-credentials 
is a list of java plugins, all of which must implement IAutoCredentials 
interface, that populate the credentials on gateway 
-and unpack them on the worker side. On a kerberos secure cluster they 
should be set by default to point to 
org.apache.storm.security.auth.kerberos.AutoTGT.  
-nimbus.credential.renewers.classes should also be set to this value so 
that nimbus can periodically renew the TGT on behalf of the user.
+To hide this from them in the common case plugins can be used to populate 
the credentials, unpack them on the other side into a java Subject, and also 
allow Nimbus to renew the credentials if needed. These are controlled by the 
following configs.
+ 
+`topology.auto-credentials` is a list of java plugins, all of which must 
implement the `IAutoCredentials` interface, that populate the credentials on 
gateway 
--- End diff --

Is gateway here nimbus or namenode like entity? Should we rename it to 
nimbus?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1978: STORM-2387 Handle tick tuples properly for Bolts i...

2017-02-28 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1978#discussion_r103527554
  
--- Diff: 
external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java
 ---
@@ -76,30 +77,33 @@ public void prepare(Map stormConf, TopologyContext 
context, OutputCollector coll
 
 @Override
 public void execute(final Tuple tuple) {
-  Future future = 
tranquilizer.send((druidEventMapper.getEvent(tuple)));
-LOG.debug("Sent tuple : [{}]" , tuple);
+if (TupleUtils.isTick(tuple)) {
--- End diff --

I agree with @satishd  Right now, onTickTuple method is in abstract base 
classes of different connectors and not even used. Taking Satish's approach is 
a little more work since we will have to change the bolts that actually process 
the tick tuples to separate out onProcess and onTickTuple logic. But i think 
its worth. @dossett I did not exactly get the problem with exceptions. Can't we 
have a catch all and rethrow as a RuntimeException?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1815: STORM-2235 Introduce new option: 'add remote repositories...

2016-12-07 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1815
  
+1 (non-binding)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1806: STORM-2226: Fix kafka spout offset lag ui for kerb...

2016-11-29 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/1806

STORM-2226: Fix kafka spout offset lag ui for kerberized kafka



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-2226

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1806.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1806


commit 8cd0507003629110598c2e6f774523a3e62665fc
Author: Priyank <ps...@hortonworks.com>
Date:   2016-11-30T04:56:01Z

STORM-2226: Fix kafka spout offset lag ui for kerberized kafka




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1665: STORM-2074: fix storm-kafka-monitor NPE bug

2016-09-04 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1665
  
+1 (non-binding)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1665: STORM-2074: fix storm-kafka-monitor NPE bug

2016-09-02 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1665
  
@vesense Not sure if you saw my comment about exiting with a code of 1 in 
getOldConsumerOffsetsFromZk. Can you check that comment and address it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1665: STORM-2074: fix storm-kafka-monitor NPE bug

2016-09-01 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1665#discussion_r77237088
  
--- Diff: 
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
 ---
@@ -89,6 +89,10 @@ public static void main (String args[]) {
 printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " 
and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required  with " +
 OPTION_OLD_CONSUMER_LONG);
 }
+String zkNode = 
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG);
--- End diff --

I am not sure if just checking the length <= 1 is really that useful. As 
you mentioned that it should start with / may be we add check satisfying all 
the conditions for a valid zookeeper node name? Or just remove any check? We 
already have a check to see if node exists later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1665: STORM-2074: fix storm-kafka-monitor NPE bug

2016-08-31 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1665#discussion_r77034491
  
--- Diff: 
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
 ---
@@ -373,16 +377,20 @@ private static Options buildOptions () {
 curatorFramework.start();
 String partitionPrefix = "partition_";
 String zkPath = oldKafkaSpoutOffsetQuery.getZkPath();
-if (!zkPath.endsWith("/")) {
-zkPath += "/";
+ if (zkPath.endsWith("/")) {
--- End diff --

Is there a reason for removing trailing / here and then appending in the 
loop below?  The loop already has a check for 
curatorFramework.checkExists().forPath(path). May be we can just print and exit 
therE? Also, can you call the printUsageAndExit method here to be consistent? 
Or is there a reason you could not use it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1665: STORM-2074: fix storm-kafka-monitor NPE bug

2016-08-31 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1665#discussion_r77033989
  
--- Diff: 
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
 ---
@@ -89,6 +89,10 @@ public static void main (String args[]) {
 printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " 
and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required  with " +
 OPTION_OLD_CONSUMER_LONG);
 }
+String zkNode = 
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG);
--- End diff --

While adding OPTION_ZK_NODE_COMMITTED_NODE_LONG to options it is added 
using the method options.addOption(String opt, String longOpt, boolean hasArg, 
String description) The third argument is set to true in the call which 
specifies that the argument must have a value. Does not that take care of null 
or empty value? Do we need to have this explicit check here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1607: STORM-2021: Add missing licenses.

2016-08-04 Thread priyank5485
Github user priyank5485 closed the pull request at:

https://github.com/apache/storm/pull/1607


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1607: STORM-2021: Add missing licenses.

2016-08-04 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1607
  
Closing this in favor of https://github.com/apache/storm/pull/1606/files


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1607: STORM-2021: Add missing licenses.

2016-08-04 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/1607

STORM-2021: Add missing licenses.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-2021

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1607.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1607


commit 05cd5e47173814c050c52a76dbe9577a7aef8e30
Author: Priyank <ps...@hortonworks.com>
Date:   2016-08-04T17:04:31Z

STORM-2021: Add missing licenses.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1605: STORM-2014: Put logic around dropping messages into Retry...

2016-08-02 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1605
  
+1 (non-binding). I had talked to @hmcl briefly about this. We should have 
his review as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-08-01 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r7301
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
 ---
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
+// object handling zk interaction
+private transient ZKConnection zkConnection;
+// object handling interaction with kinesis
+private transient KinesisConnection kinesisConnection;
+// Kinesis Spout KinesisConfig object
+private transient final KinesisConfig kinesisConfig;
+// Queue of records per shard fetched from kinesis and are waiting to 
be emitted
+private transient Map<String, LinkedList> toEmitPerShard = new 
HashMap<>();
+// Map of records  that were fetched from kinesis as a result of 
failure and are waiting to be emitted
+private transient Map<KinesisMessageId, Record> 
failedandFetchedRecords = new HashMap<>();
+// Sequence numbers per shard that have been emitted. LinkedHashSet as 
we need to remove on ack or fail. At the same time order is needed to figure 
out the
+// sequence number to commit. Logic explained in commit
+private transient Map<String, TreeSet> emittedPerShard = 
new HashMap<>();
+// sorted acked sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map<String, TreeSet> ackedPerShard = new 
HashMap<>();
+// sorted failed sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map<String, TreeSet> failedPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for new messages
+private transient Map<String, String> shardIteratorPerShard = new 
HashMap<>();
+// last fetched sequence number corresponding to position in shard
+private transient Map<String, String> fetchedSequenceNumberPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for failed 
messages
+private transient Map<KinesisMessageId, String> 
shardIteratorPerFailedMessage = new HashMap<>();
+// timestamp to decide when to commit to zk again
+private transient long lastCommitTime;
+// boolean to track deactivated state
+private transient boolean deactivated;
+
+KinesisRecordsManager (KinesisConfig kinesisConfig) {
+this.kinesisConfig = kinesisConfig;
+this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
+this.kinesisConnection = new 
KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
+}
+
+void initialize (int myTaskIndex, int totalTasks) {
+deactivated = false;
+lastCommitTime = System.currentTimeMillis();
+kinesisConnection.initialize();
+zkConnection.initialize();
+List shards = 
kinesisConnection.getShardsForStream(kines

[GitHub] storm issue #1586: STORM-1839: Storm spout implementation for Amazon Kinesis...

2016-08-01 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1586
  
@harshach I added some more comments in ack method and commit method for 
acking logic. I also moved code interacting with zk and kinesis in to its own 
class to make KinesisRecordsManager smaller. Please review when you get a chance


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1586: STORM-1839: Storm spout implementation for Amazon Kinesis...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1586
  
@harshach Thanks for the review. I have addressed your comments and pushed 
a commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72306421
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java 
---
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import java.io.Serializable;
+
+public class ZkInfo implements Serializable {
+// comma separated list of zk connect strings to connect to zookeeper 
e.g. localhost:2181
+private final String zkUrl;
+// zk node under which to commit the sequence number of messages. e.g. 
/committed_sequence_numbers
+private final String zkNode;
+// zk session timeout in milliseconds
+private final Integer sessionTimeoutMs;
+// zk connection timeout in milliseconds
+private final Integer connectionTimeoutMs;
+// interval at which to commit offsets to zk in milliseconds
+private final Long commitIntervalMs;
+// number of retry attempts for zk
+private final Integer retryAttempts;
+// time to sleep between retries in milliseconds
+private final Integer retryIntervalMs;
+
+/**
+ * Default constructor that uses defaults for a local setup
+ */
+public ZkInfo () {
+this("localhost:2181", "/kinesisOffsets", 2, 15000, 1L, 3, 
2000);
+}
+
+public ZkInfo (String zkUrl, String zkNode, Integer sessionTimeoutMs, 
Integer connectionTimeoutMs, Long commitIntervalMs, Integer retryAttempts, 
Integer
+retryIntervalMs) {
+this.zkUrl = zkUrl;
+this.zkNode = zkNode;
+this.sessionTimeoutMs = sessionTimeoutMs;
+this.connectionTimeoutMs = connectionTimeoutMs;
+this.commitIntervalMs = commitIntervalMs;
+this.retryAttempts = retryAttempts;
+this.retryIntervalMs = retryIntervalMs;
+validate();
+}
+
+public String getZkUrl() {
+return zkUrl;
+}
+
+public String getZkNode() {
+return zkNode;
+}
+
+public Integer getSessionTimeoutMs() {
+return sessionTimeoutMs;
+}
+
+public Integer getConnectionTimeoutMs() {
+return connectionTimeoutMs;
+}
+
+public Long getCommitIntervalMs() {
+return commitIntervalMs;
+}
+
+public Integer getRetryAttempts() {
+return retryAttempts;
+}
+
+public Integer getRetryIntervalMs() {
+return retryIntervalMs;
+}
+
+private void validate () {
+
+if (zkUrl == null || zkUrl.length() < 1) {
+throw new IllegalArgumentException("zkUrl must be specified to 
connect to zookeeper");
+}
+if (zkNode == null || zkNode.length() < 1) {
+throw new IllegalArgumentException("zkNode must be specified");
+}
+checkPositive(sessionTimeoutMs, "sessionTimeoutMs");
+checkPositive(connectionTimeoutMs, "connectionTimeoutMs");
+checkPositive(commitIntervalMs, "commitIntervalMs");
+checkPositive(retryAttempts, "retryAttempts");
+checkPositive(retryIntervalMs, "retryIntervalMs");
+}
+
+private void checkPositive (Integer argument, String name) {
+if (argument == null && argument <= 0) {
+throw new IllegalArgumentException(name + " must be positive");
+}
+}
+private void checkPositive (Long argument, String name) {
+if (argument == null && argument <= 0) {
+throw new IllegalArgumentException(name + " must be positive");
+}
+}
+
+@Override
+public String toString() {
+r

[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72306731
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ZkInfo.java 
---
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import java.io.Serializable;
+
+public class ZkInfo implements Serializable {
+// comma separated list of zk connect strings to connect to zookeeper 
e.g. localhost:2181
+private final String zkUrl;
+// zk node under which to commit the sequence number of messages. e.g. 
/committed_sequence_numbers
+private final String zkNode;
+// zk session timeout in milliseconds
+private final Integer sessionTimeoutMs;
+// zk connection timeout in milliseconds
+private final Integer connectionTimeoutMs;
+// interval at which to commit offsets to zk in milliseconds
+private final Long commitIntervalMs;
+// number of retry attempts for zk
+private final Integer retryAttempts;
+// time to sleep between retries in milliseconds
+private final Integer retryIntervalMs;
+
+/**
+ * Default constructor that uses defaults for a local setup
+ */
+public ZkInfo () {
+this("localhost:2181", "/kinesisOffsets", 2, 15000, 1L, 3, 
2000);
--- End diff --

Will remove it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72306241
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
 ---
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
+// zk interaction object
+private transient CuratorFramework curatorFramework;
+// Kinesis Spout Config object
+private transient final Config config;
+// Queue of records per shard fetched from kinesis and are waiting to 
be emitted
+private transient Map<String, LinkedList> toEmitPerShard = new 
HashMap<>();
+// Map of records  that were fetched from kinesis as a result of 
failure and are waiting to be emitted
+private transient Map<KinesisMessageId, Record> 
failedandFetchedRecords = new HashMap<>();
+// Sequence numbers per shard that have been emitted. LinkedHashSet as 
we need to remove on ack or fail. At the same time order is needed to figure 
out the
+// sequence number to commit. Logic explained in commit
+private transient Map<String, TreeSet> emittedPerShard = 
new HashMap<>();
+// sorted acked sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map<String, TreeSet> ackedPerShard = new 
HashMap<>();
+// sorted failed sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map<String, TreeSet> failedPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for new messages
+private transient Map<String, String> shardIteratorPerShard = new 
HashMap<>();
+// last fetched sequence number corresponding to position in shard
+private transient Map<String, String> fetchedSequenceNumberPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for failed 
messages
+private transient Map<KinesisMessageId, String> 
shardIteratorPerFailedMessage = new HashMap<>();
+// timestamp to decide when to commit to zk again
+private transient long lastCommitTime;
+// boolean to track deactivate

[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72305349
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
 ---
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
+// zk interaction object
+private transient CuratorFramework curatorFramework;
+// Kinesis Spout Config object
+private transient final Config config;
+// Queue of records per shard fetched from kinesis and are waiting to 
be emitted
+private transient Map<String, LinkedList> toEmitPerShard = new 
HashMap<>();
+// Map of records  that were fetched from kinesis as a result of 
failure and are waiting to be emitted
+private transient Map<KinesisMessageId, Record> 
failedandFetchedRecords = new HashMap<>();
+// Sequence numbers per shard that have been emitted. LinkedHashSet as 
we need to remove on ack or fail. At the same time order is needed to figure 
out the
+// sequence number to commit. Logic explained in commit
+private transient Map<String, TreeSet> emittedPerShard = 
new HashMap<>();
+// sorted acked sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map<String, TreeSet> ackedPerShard = new 
HashMap<>();
+// sorted failed sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map<String, TreeSet> failedPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for new messages
+private transient Map<String, String> shardIteratorPerShard = new 
HashMap<>();
+// last fetched sequence number corresponding to position in shard
+private transient Map<String, String> fetchedSequenceNumberPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for failed 
messages
+private transient Map<KinesisMessageId, String> 
shardIteratorPerFailedMessage = new HashMap<>();
+// timestamp to decide when to commit to zk again
+private transient long lastCommitTime;
+// boolean to track deactivate

[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72305189
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisRecordsManager.java
 ---
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.zookeeper.CreateMode;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+class KinesisRecordsManager {
+private static final Logger LOG = 
LoggerFactory.getLogger(KinesisRecordsManager.class);
+// zk interaction object
+private transient CuratorFramework curatorFramework;
+// Kinesis Spout Config object
+private transient final Config config;
+// Queue of records per shard fetched from kinesis and are waiting to 
be emitted
+private transient Map<String, LinkedList> toEmitPerShard = new 
HashMap<>();
+// Map of records  that were fetched from kinesis as a result of 
failure and are waiting to be emitted
+private transient Map<KinesisMessageId, Record> 
failedandFetchedRecords = new HashMap<>();
+// Sequence numbers per shard that have been emitted. LinkedHashSet as 
we need to remove on ack or fail. At the same time order is needed to figure 
out the
+// sequence number to commit. Logic explained in commit
+private transient Map<String, TreeSet> emittedPerShard = 
new HashMap<>();
+// sorted acked sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map<String, TreeSet> ackedPerShard = new 
HashMap<>();
+// sorted failed sequence numbers - needed to figure out what sequence 
number can be committed
+private transient Map<String, TreeSet> failedPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for new messages
+private transient Map<String, String> shardIteratorPerShard = new 
HashMap<>();
+// last fetched sequence number corresponding to position in shard
+private transient Map<String, String> fetchedSequenceNumberPerShard = 
new HashMap<>();
+// shard iterator corresponding to position in shard for failed 
messages
+private transient Map<KinesisMessageId, String> 
shardIteratorPerFailedMessage = new HashMap<>();
+// timestamp to decide when to commit to zk again
+private transient long lastCommitTime;
+// boolean to track deactivate

[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72303034
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
 ---
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class KinesisConnectionInfo implements Serializable {
+private final byte[] serializedKinesisCredsProvider;
+private final byte[] serializedkinesisClientConfig;
+private final Integer recordsLimit;
+private final Regions region;
+
+private transient AWSCredentialsProvider credentialsProvider;
+private transient ClientConfiguration clientConfiguration;
+
+/**
+ *
+ * @param credentialsProvider implementation to provide credentials to 
connect to kinesis
+ * @param clientConfiguration client configuration to pass to kinesis 
client
+ * @param region region to connect to
+ * @param recordsLimit max records to be fetched in a getRecords 
request to kinesis
+ */
+public KinesisConnectionInfo (AWSCredentialsProvider 
credentialsProvider, ClientConfiguration clientConfiguration, Regions region, 
Integer recordsLimit) {
+if (recordsLimit == null || recordsLimit <= 0) {
+throw new IllegalArgumentException("recordsLimit has to be a 
positive integer");
+}
+if (region == null) {
+throw new IllegalArgumentException("region cannot be null");
+}
+serializedKinesisCredsProvider = 
getKryoSerializedBytes(credentialsProvider);
+serializedkinesisClientConfig = 
getKryoSerializedBytes(clientConfiguration);
+this.recordsLimit = recordsLimit;
+this.region = region;
+
+this.credentialsProvider = null;
+this.clientConfiguration = null;
+}
+
+public Integer getRecordsLimit() {
+return recordsLimit;
+}
+
+public AWSCredentialsProvider getCredentialsProvider() {
+if (credentialsProvider == null) {
+credentialsProvider = (AWSCredentialsProvider) 
this.getKryoDeserializedObject(serializedKinesisCredsProvider);
+}
+return credentialsProvider;
+}
+
+public ClientConfiguration getClientConfiguration() {
+if (clientConfiguration == null) {
+clientConfiguration = (ClientConfiguration) 
this.getKryoDeserializedObject(serializedkinesisClientConfig);
+}
+return clientConfiguration;
+}
+
+public Regions getRegion() {
+return region;
+}
+
+private byte[] getKryoSerializedBytes (final Object obj) {
+final Kryo kryo = new Kryo();
+final ByteArrayOutputStream os = new ByteArrayOutputStream();
+final Output output = new Output(os);
+kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+kryo.writeClassAndObject(output, obj);
+output.flush();
+return os.toByteArray();
+}
+
+private Object getKryoDeserializedObject (final byte[] ser) {
+final Kryo kryo = new Kryo();
+final Input input = new Input(new ByteArrayInputStream(ser));
+kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
+return kryo.readClassAndObject(input);
+}
+
+ 

[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72302997
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/KinesisConnectionInfo.java
 ---
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+
+public class KinesisConnectionInfo implements Serializable {
+private final byte[] serializedKinesisCredsProvider;
+private final byte[] serializedkinesisClientConfig;
+private final Integer recordsLimit;
+private final Regions region;
+
+private transient AWSCredentialsProvider credentialsProvider;
+private transient ClientConfiguration clientConfiguration;
+
+/**
+ *
+ * @param credentialsProvider implementation to provide credentials to 
connect to kinesis
+ * @param clientConfiguration client configuration to pass to kinesis 
client
+ * @param region region to connect to
+ * @param recordsLimit max records to be fetched in a getRecords 
request to kinesis
+ */
+public KinesisConnectionInfo (AWSCredentialsProvider 
credentialsProvider, ClientConfiguration clientConfiguration, Regions region, 
Integer recordsLimit) {
+if (recordsLimit == null || recordsLimit <= 0) {
+throw new IllegalArgumentException("recordsLimit has to be a 
positive integer");
+}
+if (region == null) {
+throw new IllegalArgumentException("region cannot be null");
+}
+serializedKinesisCredsProvider = 
getKryoSerializedBytes(credentialsProvider);
+serializedkinesisClientConfig = 
getKryoSerializedBytes(clientConfiguration);
+this.recordsLimit = recordsLimit;
+this.region = region;
+
+this.credentialsProvider = null;
--- End diff --

Will change it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72291643
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
 ---
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ExponentialBackoffRetrier implements 
FailedMessageRetryHandler, Serializable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
+// Wait interfal for retrying after first failure
+private final Long initialDelayMillis;
+// Base for exponential function in seconds for retrying for second, 
third and so on failures
+private final Long baseSeconds;
+// Maximum number of retries
+private final Long maxRetries;
+// map to track number of failures for each kinesis message that failed
+private Map<KinesisMessageId, Long> failCounts = new HashMap<>();
+// map to track next retry time for each kinesis message that failed
+private Map<KinesisMessageId, Long> retryTimes = new HashMap<>();
+// sorted set of records to be retrued based on retry time. earliest 
retryTime record comes first
+private SortedSet retryMessageSet = new 
TreeSet<>(new RetryTimeComparator());
+
+/**
+ * no args constructor that uses defaults of 100 ms for first retry, 
max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) 
secs for
+ * retry i where i = 2,3,
+ */
+public ExponentialBackoffRetrier () {
+this(100L, 2L, Long.MAX_VALUE);
+}
+
+/**
+ *
+ * @param initialDelayMillis delay in milliseconds for first retry
+ * @param baseSeconds base for exponent function in seconds
+ * @param maxRetries maximum number of retries before the record is 
discarded/acked
+ */
+public ExponentialBackoffRetrier (Long initialDelayMillis, Long 
baseSeconds, Long maxRetries) {
+this.initialDelayMillis = initialDelayMillis;
+this.baseSeconds = baseSeconds;
+this.maxRetries = maxRetries;
+validate();
+}
+
+private void validate () {
+if (initialDelayMillis < 0) {
+throw new IllegalArgumentException("initialDelayMillis cannot 
be negative." );
+}
+if (baseSeconds < 0) {
+throw new IllegalArgumentException("baseSeconds cannot be 
negative.");
+}
+if (maxRetries < 0) {
+throw new IllegalArgumentException("maxRetries cannot be 
negative.");
+}
+}
+@Override
+public boolean failed(KinesisMessageId messageId) {
+LOG.debug("Handling failed message " + messageId);
+// if maxRetries is 0, dont retry and return false as per 
interface contract
+if (maxRetries == 0) {
+LOG.debug("maxRetries set to 0. Hence not queueing " + 
messageId);
+return false;
+}
+// if first failure add it to the count map
+if (!failCounts.containsKey(messageId)) {
+failCounts.put(messageId, 0L);
+}
+// increment the fail count as we started with 0
+Long failCount = failCounts.get(messageId);
+failCounts.put(messageId, ++failCount);
+// if fail count is greater than maxRetries, discard or ack. for 
e.g. for maxRetries 3, 4 failures are allowed at maximum
+if (failCount > maxRetries) {
+LOG.debug("maxR

[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72292758
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
 ---
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ExponentialBackoffRetrier implements 
FailedMessageRetryHandler, Serializable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
+// Wait interfal for retrying after first failure
+private final Long initialDelayMillis;
+// Base for exponential function in seconds for retrying for second, 
third and so on failures
+private final Long baseSeconds;
+// Maximum number of retries
+private final Long maxRetries;
+// map to track number of failures for each kinesis message that failed
+private Map<KinesisMessageId, Long> failCounts = new HashMap<>();
+// map to track next retry time for each kinesis message that failed
+private Map<KinesisMessageId, Long> retryTimes = new HashMap<>();
+// sorted set of records to be retrued based on retry time. earliest 
retryTime record comes first
+private SortedSet retryMessageSet = new 
TreeSet<>(new RetryTimeComparator());
--- End diff --

No. Messages are not stored anywhere in ExponentialBackoffRetrier. 
retryTimes is just used to implement the comparator interface that will be used 
by retryMessageSet TreeSet to order the messages on their next retry time. You 
can check out the RetryTimeComparator in this class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72292144
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
 ---
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ExponentialBackoffRetrier implements 
FailedMessageRetryHandler, Serializable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
+// Wait interfal for retrying after first failure
+private final Long initialDelayMillis;
--- End diff --

ExponentialBackoffRetrier has two constructors. The no args constructor 
will set the default values for these variables.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72291939
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
 ---
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ExponentialBackoffRetrier implements 
FailedMessageRetryHandler, Serializable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
+// Wait interfal for retrying after first failure
+private final Long initialDelayMillis;
+// Base for exponential function in seconds for retrying for second, 
third and so on failures
+private final Long baseSeconds;
+// Maximum number of retries
+private final Long maxRetries;
+// map to track number of failures for each kinesis message that failed
+private Map<KinesisMessageId, Long> failCounts = new HashMap<>();
+// map to track next retry time for each kinesis message that failed
+private Map<KinesisMessageId, Long> retryTimes = new HashMap<>();
+// sorted set of records to be retrued based on retry time. earliest 
retryTime record comes first
+private SortedSet retryMessageSet = new 
TreeSet<>(new RetryTimeComparator());
+
+/**
+ * no args constructor that uses defaults of 100 ms for first retry, 
max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) 
secs for
+ * retry i where i = 2,3,
+ */
+public ExponentialBackoffRetrier () {
+this(100L, 2L, Long.MAX_VALUE);
+}
+
+/**
+ *
+ * @param initialDelayMillis delay in milliseconds for first retry
+ * @param baseSeconds base for exponent function in seconds
+ * @param maxRetries maximum number of retries before the record is 
discarded/acked
+ */
+public ExponentialBackoffRetrier (Long initialDelayMillis, Long 
baseSeconds, Long maxRetries) {
+this.initialDelayMillis = initialDelayMillis;
+this.baseSeconds = baseSeconds;
+this.maxRetries = maxRetries;
+validate();
+}
+
+private void validate () {
+if (initialDelayMillis < 0) {
+throw new IllegalArgumentException("initialDelayMillis cannot 
be negative." );
+}
+if (baseSeconds < 0) {
+throw new IllegalArgumentException("baseSeconds cannot be 
negative.");
+}
+if (maxRetries < 0) {
+throw new IllegalArgumentException("maxRetries cannot be 
negative.");
+}
+}
+@Override
+public boolean failed(KinesisMessageId messageId) {
+LOG.debug("Handling failed message " + messageId);
+// if maxRetries is 0, dont retry and return false as per 
interface contract
+if (maxRetries == 0) {
+LOG.debug("maxRetries set to 0. Hence not queueing " + 
messageId);
--- End diff --

I will change it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72291774
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
 ---
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ExponentialBackoffRetrier implements 
FailedMessageRetryHandler, Serializable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
+// Wait interfal for retrying after first failure
+private final Long initialDelayMillis;
+// Base for exponential function in seconds for retrying for second, 
third and so on failures
+private final Long baseSeconds;
+// Maximum number of retries
+private final Long maxRetries;
+// map to track number of failures for each kinesis message that failed
+private Map<KinesisMessageId, Long> failCounts = new HashMap<>();
+// map to track next retry time for each kinesis message that failed
+private Map<KinesisMessageId, Long> retryTimes = new HashMap<>();
+// sorted set of records to be retrued based on retry time. earliest 
retryTime record comes first
+private SortedSet retryMessageSet = new 
TreeSet<>(new RetryTimeComparator());
+
+/**
+ * no args constructor that uses defaults of 100 ms for first retry, 
max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1) 
secs for
+ * retry i where i = 2,3,
+ */
+public ExponentialBackoffRetrier () {
+this(100L, 2L, Long.MAX_VALUE);
+}
+
+/**
+ *
+ * @param initialDelayMillis delay in milliseconds for first retry
+ * @param baseSeconds base for exponent function in seconds
+ * @param maxRetries maximum number of retries before the record is 
discarded/acked
+ */
+public ExponentialBackoffRetrier (Long initialDelayMillis, Long 
baseSeconds, Long maxRetries) {
+this.initialDelayMillis = initialDelayMillis;
+this.baseSeconds = baseSeconds;
+this.maxRetries = maxRetries;
+validate();
+}
+
+private void validate () {
+if (initialDelayMillis < 0) {
+throw new IllegalArgumentException("initialDelayMillis cannot 
be negative." );
+}
+if (baseSeconds < 0) {
+throw new IllegalArgumentException("baseSeconds cannot be 
negative.");
+}
+if (maxRetries < 0) {
+throw new IllegalArgumentException("maxRetries cannot be 
negative.");
+}
+}
+@Override
+public boolean failed(KinesisMessageId messageId) {
+LOG.debug("Handling failed message " + messageId);
+// if maxRetries is 0, dont retry and return false as per 
interface contract
+if (maxRetries == 0) {
+LOG.debug("maxRetries set to 0. Hence not queueing " + 
messageId);
+return false;
+}
+// if first failure add it to the count map
+if (!failCounts.containsKey(messageId)) {
+failCounts.put(messageId, 0L);
+}
+// increment the fail count as we started with 0
+Long failCount = failCounts.get(messageId);
+failCounts.put(messageId, ++failCount);
+// if fail count is greater than maxRetries, discard or ack. for 
e.g. for maxRetries 3, 4 failures are allowed at maximum
+if (failCount > maxRetries) {
+LOG.debug("maxRet

[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72291464
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java 
---
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Date;
+
+public class Config implements Serializable {
+// kinesis stream name to read from
+private final String streamName;
+// shard iterator type based on kinesis api - beginning of time, 
latest, at timestamp are only supported
+private final ShardIteratorType shardIteratorType;
+// implementation for converting a Kinesis record to a storm tuple
+private final RecordToTupleMapper recordToTupleMapper;
+// timestamp to be used for shardIteratorType AT_TIMESTAMP - can be 
null
+private final Date timestamp;
+// implementation for handling the failed messages retry logic
+private final FailedMessageRetryHandler failedMessageRetryHandler;
+// object capturing all zk related information for storing committed 
sequence numbers
+private final ZkInfo zkInfo;
+// object representing information on paramaters to use while 
connecting to kinesis using kinesis client
+private final KinesisConnectionInfo kinesisConnectionInfo;
+// this number represents the number of messages that are still not 
committed to zk. it will prevent the spout from emitting further.
+// for e.g. if 1 failed and 2,3,4,5. all have been acked by storm, 
they still cant be committed to zk because 1 is still in failed set. As a result
+// the acked queue can infinitely grow without any of them being 
committed to zk. topology max pending does not help since from storm's view 
they are acked
+private final Long maxUncommittedRecords;
+
+public Config (String streamName, ShardIteratorType shardIteratorType, 
RecordToTupleMapper recordToTupleMapper, Date timestamp, 
FailedMessageRetryHandler
+failedMessageRetryHandler, ZkInfo zkInfo, 
KinesisConnectionInfo kinesisConnectionInfo, Long maxUncommittedRecords) {
+this.streamName = streamName;
+this.shardIteratorType = shardIteratorType;
+this.recordToTupleMapper = recordToTupleMapper;
+this.timestamp = timestamp;
+this.failedMessageRetryHandler = failedMessageRetryHandler;
+this.zkInfo = zkInfo;
+this.kinesisConnectionInfo = kinesisConnectionInfo;
+this.maxUncommittedRecords = maxUncommittedRecords;
+validate();
+}
+
+private void validate () {
+if (streamName == null || streamName.length() < 1) {
+throw new IllegalArgumentException("streamName is required and 
cannot be of length 0.");
+}
+if (shardIteratorType == null || 
shardIteratorType.equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || 
shardIteratorType.equals(ShardIteratorType
+.AT_SEQUENCE_NUMBER)) {
+throw new IllegalArgumentException("shardIteratorType has to 
be one of the " + ShardIteratorType.AT_TIMESTAMP + "," + 
ShardIteratorType.LATEST +
+"," + ShardIteratorType.TRIM_HORIZON);
+}
+if (shardIteratorType.equals(ShardIteratorType.AT_TIMESTAMP) && 
timestamp == null) {
+throw new IllegalArgumentException("timestamp must be provided 
if shardIteratorType is " + ShardIteratorType.AT_TIMESTAMP);
+}
+if (recordToTupleMapper == null) {
+throw new IllegalArgumentException("recordToTupleMapper cannot 
be null");
+}
+if (failedMessageRetryHandler == null) {
   

[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-26 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1586#discussion_r72291240
  
--- Diff: 
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/Config.java 
---
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Date;
+
+public class Config implements Serializable {
--- End diff --

Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1586: STORM-1839: Storm spout implementation for Amazon ...

2016-07-23 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/1586

STORM-1839: Storm spout implementation for Amazon Kinesis Streams.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1839

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1586.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1586


commit de68c267fcb7555c7729c9377d3f6d1e504ec25e
Author: Priyank <ps...@hortonworks.com>
Date:   2016-07-12T19:17:54Z

STORM-1839: Storm spout implementation for Amazon Kinesis Streams.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1576: Kafka Spout New Consumer API - KafkaSpoutRetryExponential...

2016-07-19 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1576
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1576: Kafka Spout New Consumer API - KafkaSpoutRetryExponential...

2016-07-19 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1576
  
@hmcl One minor thing. Should we handle long overflows?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1505: STORM-1136: Command line module to return kafka sp...

2016-06-20 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/1505

STORM-1136: Command line module to return kafka spout offsets lag and…

… display in storm ui.

STORM-1136: Missed file in previous commit

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1136-master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1505.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1505


commit 482d96ca9d6a3cb681ea0e393ffade2ebd3d93a2
Author: Priyank <ps...@hortonworks.com>
Date:   2016-05-18T19:14:50Z

STORM-1136: Command line module to return kafka spout offsets lag and 
display in storm ui.

STORM-1136: Missed file in previous commit




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

2016-06-09 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1451
  
@abellina I have added a comment for that div. As far as renaming it to 
topology-spout-stats is concerned I prefer the current name since right below 
that div there is a div showing stats for the spout like complete latency, etc. 
I dont want the two to be confused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

2016-06-09 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1451#discussion_r66499861
  
--- Diff: storm-core/src/ui/public/topology.html ---
@@ -384,8 +389,51 @@ Topology 
resources
 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
 $('#topology-actions [data-toggle="tooltip"]').tooltip();
 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
+
+var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
+$.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
+  if (lagResponse !== null && lagResponse !== undefined && 
lagResponse instanceof Array && lagResponse.length > 0) {
+var kafkaSpoutsLagTemplate = 
$(template).filter("#topology-kafka-spouts-lag-template").html();
+var spoutsErrorTemplate = 
$(template).filter("#topology-spouts-lag-error-template").html();
+
+var kafkaSpoutLags = lagResponse.filter(function(ele) 
{return ele.spoutType === "KAFKA";});
+var isJson = function (input) {
+  try {
+JSON.parse(input);
+  } catch (e) {
+return false;
+  }
+  return true;
+};
+var kafkaSpoutsValidResults = 
kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
+var kafkaSpoutsErrorResults = 
kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
+var data = {};
+if (kafkaSpoutsValidResults.length > 0) {
+  data.kafkaSpoutsLagResults = [];
+  kafkaSpoutsValidResults.forEach(function(ele) {
+var spoutLagResult = JSON.parse(ele.spoutLagResult);
+spoutLagResult.forEach(function(ele2) {
--- End diff --

@abellina The spoutLagResult property is actually an array. Hence the 
flattening. 

Let me elaborate the limitations of valid and invalid approach. The 
TopologySpoutLag class in storm-core called by ui server does not know anything 
about if the spoutLagResult is valid or not. Reason is,  the way to get lag 
information for kafka and other spouts is handled by making a shell call 
handled by an external module since we did not want any direct dependency on 
storm core. For now we have kafka, but we can add other types of spouts as 
well. Plus the fields in ui or the template for a different type of spout could 
be different. The only commonality I found was if something went wrong for 
getting lag info for underlying spout(kafka or other) then an error message is 
sent in response. This will work for all types of spouts. To do what you are 
saying we will have to inspect the response in TopologySpoutLag class. I felt 
it did not matter if it were ui doing that or server. I preferred ui. Let me 
know if it makes sense or not and if you still think we need to change somethin
 g.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

2016-06-08 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1451
  
@harshach Rebased 1.x branch since there were some merges since the PR was 
raised.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

2016-06-08 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1451
  
@abellina The tables are optional already. If there are no kafka spouts 
then the response will be an empty array. Can you try running a non kafka spout 
topology and check? I did.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

2016-06-08 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1451#discussion_r66382723
  
--- Diff: storm-core/src/ui/public/topology.html ---
@@ -384,8 +389,51 @@ Topology 
resources
 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
 $('#topology-actions [data-toggle="tooltip"]').tooltip();
 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
+
+var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
+$.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
+  if (lagResponse !== null && lagResponse !== undefined && 
lagResponse instanceof Array && lagResponse.length > 0) {
+var kafkaSpoutsLagTemplate = 
$(template).filter("#topology-kafka-spouts-lag-template").html();
+var spoutsErrorTemplate = 
$(template).filter("#topology-spouts-lag-error-template").html();
+
+var kafkaSpoutLags = lagResponse.filter(function(ele) 
{return ele.spoutType === "KAFKA";});
+var isJson = function (input) {
+  try {
+JSON.parse(input);
+  } catch (e) {
+return false;
+  }
+  return true;
+};
+var kafkaSpoutsValidResults = 
kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
+var kafkaSpoutsErrorResults = 
kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
+var data = {};
+if (kafkaSpoutsValidResults.length > 0) {
+  data.kafkaSpoutsLagResults = [];
+  kafkaSpoutsValidResults.forEach(function(ele) {
+var spoutLagResult = JSON.parse(ele.spoutLagResult);
+spoutLagResult.forEach(function(ele2) {
--- End diff --

We would still need the for loop as need to flatten the results out. I 
chose to use id in the json passed to template since the template uses id in 
other tables for ui. If you are fine we can leave it as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

2016-06-08 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1451#discussion_r66382357
  
--- Diff: storm-core/src/ui/public/topology.html ---
@@ -384,8 +389,51 @@ Topology 
resources
 $('#topology-configuration [data-toggle="tooltip"]').tooltip();
 $('#topology-actions [data-toggle="tooltip"]').tooltip();
 $('#topology-visualization [data-toggle="tooltip"]').tooltip();
+
+var lagUrl = "/api/v1/topology/"+topologyId+"/lag";
+$.getJSON(lagUrl,function(lagResponse,status,jqXHR) {
+  if (lagResponse !== null && lagResponse !== undefined && 
lagResponse instanceof Array && lagResponse.length > 0) {
+var kafkaSpoutsLagTemplate = 
$(template).filter("#topology-kafka-spouts-lag-template").html();
+var spoutsErrorTemplate = 
$(template).filter("#topology-spouts-lag-error-template").html();
+
+var kafkaSpoutLags = lagResponse.filter(function(ele) 
{return ele.spoutType === "KAFKA";});
+var isJson = function (input) {
+  try {
+JSON.parse(input);
+  } catch (e) {
+return false;
+  }
+  return true;
+};
+var kafkaSpoutsValidResults = 
kafkaSpoutLags.filter(function (ele) {return isJson(ele.spoutLagResult);});
+var kafkaSpoutsErrorResults = 
kafkaSpoutLags.filter(function (ele) {return !isJson(ele.spoutLagResult);});
--- End diff --

@abellina If its an error we only want to show an appropriate message to 
the user as a string. Also, not that the top level response is still a json. I 
think string is fine here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1451: STORM-1136: Command line module to return kafka spout off...

2016-06-08 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1451
  
opened https://issues.apache.org/jira/browse/STORM-1891to track trident 
kafka spouts


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

2016-06-02 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1451#discussion_r65582081
  
--- Diff: 
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
 ---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.monitor;
+
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.json.simple.JSONValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class for querying offset lag for kafka spout
+ */
+public class KafkaOffsetLagUtil {
+private static final String OPTION_TOPIC_SHORT = "t";
+private static final String OPTION_TOPIC_LONG = "topics";
+private static final String OPTION_OLD_CONSUMER_SHORT = "o";
+private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
+private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
+private static final String OPTION_BOOTSTRAP_BROKERS_LONG = 
"bootstrap-brokers";
+private static final String OPTION_GROUP_ID_SHORT = "g";
+private static final String OPTION_GROUP_ID_LONG = "groupid";
+private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
+private static final String OPTION_TOPIC_WILDCARD_LONG = 
"wildcard-topic";
+private static final String OPTION_PARTITIONS_SHORT = "p";
+private static final String OPTION_PARTITIONS_LONG = "partitions";
+private static final String OPTION_LEADERS_SHORT = "l";
+private static final String OPTION_LEADERS_LONG = "leaders";
+private static final String OPTION_ZK_SERVERS_SHORT = "z";
+private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
+private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
+private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
+private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
+private static final String OPTION_ZK_BROKERS_ROOT_LONG = 
"zk-brokers-root-node";
+
+public static void main (String args[]) {
+try {
+List results;
+Options options = buildOptions();
+CommandLineParser parser = new DefaultParser();
+CommandLine commandLine = parser.parse(options, args);
+if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
+printUsageAndExit(options, OPTION_TOPIC_LONG + " is 
required");
+}
+if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
+OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
+if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || 
commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
+printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or 
" + OPTION_

[GitHub] storm pull request #1451: STORM-1136: Command line module to return kafka sp...

2016-06-02 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1451#discussion_r65581276
  
--- Diff: 
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
 ---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.monitor;
+
+import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.json.simple.JSONValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class for querying offset lag for kafka spout
+ */
+public class KafkaOffsetLagUtil {
+private static final String OPTION_TOPIC_SHORT = "t";
--- End diff --

I think its just convenience. You can use -t or --topics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

2016-05-31 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/1451
  
@harshach Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

2016-05-31 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/1451
  
@harshach Will do that. Just to confirm, you mean the section header in UI 
under Topology Stats, right? Also, since the topology can technically have more 
than one kafka spout and we are showing lags for all spouts there I was 
thinking of keeping it as plural. May be Kafka Spouts Lag

Does that sound okay?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

2016-05-31 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/1451
  
@HeartSaVioR I just attached the two screen shots. One when it successfully 
managed to get the lag result. Other one by stopping kafka in which case it 
will just show a message in the UI indicating the cause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1136: Command line module to return kafka spout of...

2016-05-31 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/1451
  

![kafkaspoutlags_success](https://cloud.githubusercontent.com/assets/5192436/15682583/402a1180-2713-11e6-9ad2-e101cbb1ea4c.png)

![kafkaspoutlags_failure](https://cloud.githubusercontent.com/assets/5192436/15682586/43807626-2713-11e6-8609-870774dbc511.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1136: Command line module to return kafk...

2016-05-30 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/1451

STORM-1136: Command line module to return kafka spout offsets lag and…

… display in storm ui.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1136

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1451.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1451


commit 83b70af3d2a419e81ea9dc8a962dfccbda2a3255
Author: Priyank <ps...@hortonworks.com>
Date:   2016-05-18T19:14:50Z

STORM-1136: Command line module to return kafka spout offsets lag and 
display in storm ui.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1535: Make sure hdfs key tab login happe...

2016-04-21 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/1351#issuecomment-213021857
  
Thanks for catching that @satishd Removed the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1535: Make sure hdfs key tab login happe...

2016-04-20 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/1351

STORM-1535: Make sure hdfs key tab login happens only once for multip…

…le bolts/executors.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1535

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1351.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1351


commit bc488c23420215db847fd8dde5a1006e319521de
Author: Priyank <ps...@hortonworks.com>
Date:   2016-03-31T19:46:25Z

STORM-1535: Make sure hdfs key tab login happens only once for multiple 
bolts/executors.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1706: Add RELEASE and storm-env.sh to st...

2016-04-12 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/1334

STORM-1706: Add RELEASE and storm-env.sh to storm-diet assembly



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1706

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1334.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1334


commit 5e62b7b36283c1e257395c59237e699067298bd1
Author: Priyank <ps...@hortonworks.com>
Date:   2016-04-12T18:46:21Z

STORM-1706: Add RELEASE and storm-env.sh to storm-diet assembly




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1129: Update ui to use topology name

2016-03-30 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/1277#issuecomment-203751947
  
@abhishekagarwal87 Please look at the $.getJson method. Line 190 in 
component.html. Now, instead of topologyId being assigned on onReady i do it in 
$.getJson. Its a side effect of squashing commits that makes it hard to look at 
the commits that addressed review comments. Sorry about that. Do you guys 
usually keep separate commits and squash in the end? I will do that next time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1668: Fix silent failing of flux for set...

2016-03-30 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/1282

STORM-1668: Fix silent failing of flux for setting a non-existent pro…

…perty.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1668-1.x

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1282


commit be09c3c1fd05ec1ed8f8d7768c4023691946d0e2
Author: Priyank <ps...@hortonworks.com>
Date:   2016-03-30T23:23:05Z

STORM-1668: Fix silent failing of flux for setting a non-existent property.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1668: Fix silent failing of flux for set...

2016-03-30 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/1281

STORM-1668: Fix silent failing of flux for setting a non-existent pro…

…perty.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1668

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1281.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1281


commit 436d7752d9e8deca5b4d83f502eb98491548896f
Author: Priyank <ps...@hortonworks.com>
Date:   2016-03-30T23:23:05Z

STORM-1668: Fix silent failing of flux for setting a non-existent property.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1129: Update ui to use topology name

2016-03-30 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1277#discussion_r57950485
  
--- Diff: storm-core/src/clj/org/apache/storm/ui/core.clj ---
@@ -466,6 +466,21 @@
"assignedCpu" (.get_assigned_cpu t)})
 "schedulerDisplayResource" (*STORM-CONF* 
Config/SCHEDULER_DISPLAY_RESOURCE)}))
 
+(defn get-topology-id [topology-name-or-id]
+  (let [summary ((all-topologies-summary) "topologies")
+filter-fun-name (fn[topo-summary] (= (topo-summary "name") 
topology-name-or-id))
+filter-fun-id (fn[topo-summary] (= (topo-summary "id") 
topology-name-or-id))
+matching-topologies-by-name (filter filter-fun-name summary)
+matching-topologies-by-id (filter filter-fun-id summary)
+matching-topologies
+(cond
+  (not-empty matching-topologies-by-name) 
matching-topologies-by-name
+  (not-empty matching-topologies-by-id) matching-topologies-by-id
+  :else nil)
+_ (when (empty? matching-topologies) (throw (NotAliveException. 
(str topology-name-or-id " is not alive"
+id ((first matching-topologies) "id")]
+id))
--- End diff --

@knusbaum Thats true. But if we prioritize ids would not your topology 
become inaccessible? I think going forward we should use topology names in ui 
and elsewhere so that we can have permanent links and since thats the 
identifier given by the user. Support for lookup with ids was added for 
backwards compatibility as suggested by @revans2 and others.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1129: Update ui to use topology name

2016-03-30 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/1277#issuecomment-203557013
  
@abhishekagarwal87 Thanks for the catch. I have updated the PR. Can you 
please review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1129: Use topology name instead of id in...

2016-03-29 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/854#issuecomment-203246876
  
@harshach @revans2 I have addressed the comments  and raised a PR 
https://github.com/apache/storm/pull/1277 against the 1.x branch as I could not 
test my branch off of master due to another issue. Can you please review the PR 
against 1.x branch? I will port the same to master by updating this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1129: Update ui to use topology name

2016-03-29 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/1277

STORM-1129: Update ui to use topology name



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1129-1.x

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1277.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1277


commit 2648d74cad7d1e46ec9bcceb9b3b388cb98cfc8d
Author: Priyank <ps...@hortonworks.com>
Date:   2016-03-29T05:31:08Z

STORM-1129: Update ui to use topology name




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1654 HBaseBolt creates tick tuples with ...

2016-03-24 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/1251#issuecomment-200919442
  
+1 (NB). @HeartSaVioR I had created 
https://issues.apache.org/jira/browse/STORM-1626 
Will close that one since its a duplicate of STORM-1654. Please let me know 
if its not true.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1449] Fix Kafka spout to maintain backw...

2016-01-19 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/994#issuecomment-173024682
  
@revans2 I am fine with the changes except one major concern pointed out by 
@Parth-Brahmbhatt I confirmed this with him and our main concern would be the 
rolling upgrade feature for topologies with kafka spout. That would not work 
anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1449] Fix Kafka spout to maintain backw...

2016-01-14 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/994#issuecomment-171576791
  
Just ran in to an issue where this fix does not work. Correct me if i am 
wrong. But a fat storm jar that is compiled with older version of storm-kafka 
will still use byte[] and the newer version of storm now uses ByteBuffer. Hence 
it throws an exception at runtime if storm is upgraded. I think upgrade of 
storm should still work with older topology jars. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1458: Add check to see if nimbus is alre...

2016-01-08 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/998

STORM-1458: Add check to see if nimbus is already running.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1458

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/998.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #998


commit d62281629afe02a34aba10633c36be546eb70a74
Author: Priyank <ps...@hortonworks.com>
Date:   2016-01-08T21:27:03Z

STORM-1458: Add check to see if nimbus is already running.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1160: Add hadoop-auth dependency needed ...

2016-01-08 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/1000

STORM-1160: Add hadoop-auth dependency needed for storm-core

hadoop-auth is needed for storm ui in a secured cluster for
authentication. Adding the dependency back since it was removed in one
of the commits.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1160

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1000.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1000


commit f939fc27fbcc8dd1fc1e3b9d8948f1994fc399af
Author: Priyank <ps...@hortonworks.com>
Date:   2016-01-09T01:15:51Z

STORM-1160: Add hadoop-auth dependency needed for storm-core

hadoop-auth is needed for storm ui in a secured cluster for
authentication. Adding the dependency back since it was removed in one
of the commits.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-904: Move bin/storm command line to java...

2015-11-18 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/662#issuecomment-157804563
  
@hustfxj  I might have missed something and its been a long time since i 
did this. I could not find any storm command that would not need a waitFor. Can 
you give an example? If we do need that flag it should be easy to add and I can 
change that.

@longdafeng I am not sure I understand the point about debugging. You can 
still put a break point in this java version of storm command line. I agree 
with you that its easier to write new functions in script. But the whole point 
of this JIRA is to have one client that works for Unix based systems and 
Windows platform. The last time I was working on it we had storm.py for Unix 
and storm batch client for windows. Both of them also had a separate file for 
environment variables. It is hard to maintain two scripts anytime we want to 
change the storm command. We can ask others about their opinion and what they 
think is better between the two approaches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1129: Use topology name instead of id in...

2015-11-05 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/854#issuecomment-154136188
  
@revans2 @Parth-Brahmbhatt @jerrypeng @harshach  Thanks all for the 
feedback.  Other than the changes here there are some commits to apache/master 
after creation of this PR which added some more apis in UI using topology id 
making this PR unmergeable. I will  sort all of that out and update the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1129: Use topology name instead of id in...

2015-11-04 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/854#issuecomment-153819160
  
@jerrypeng We have a check and it throws the following exception.

Exception in thread "main" java.lang.RuntimeException: Topology with name 
`wordcount` already exists on cluster
at 
backtype.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:231)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:275)
at 
backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:311)
at 
backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:292)
at storm.starter.WordCountTopology.main(WordCountTopology.java:94)

@revans2 Thanks for chiming in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1129: Use topology name instead of id in...

2015-11-03 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/854

STORM-1129: Use topology name instead of id in UI calls.

Note that all-topologies-summary has been used to get topology id from a 
topology name. That involves a few calls to zookeeper which is not ideal. 
However, UI does not seem to take any significant performance hit. If needed we 
can handle it possibly using one of the options below. Since its a separate 
performance issue we can handle it in a separate JIRA.

1. Have nimbus thrift server cache summary for topologies so it does not 
hit zookeeper every time we try to get topology id from name.
2. Update nimbus thrift api with a method that takes options and use that 
to do only the minimal necessary interaction with zookeeper for a given option.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1129

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/854.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #854


commit 77b73af52bdc2d691ae172a1a2905a94395c3c70
Author: Priyank <ps...@hortonworks.com>
Date:   2015-11-04T00:20:50Z

STORM-1129: Use topology name instead of id in UI calls.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-997: Add proxy user functionality for st...

2015-10-05 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/692#issuecomment-145642316
  
@revans2 I think what you mentioned above would not happen. I mean the part 
where you say that storm running in unsecured mode and hdfs in secured mode 
will disable security. Please correct me if i am wrong. I am new to this. But 
the way i imagine it will work is that the proxy user functionality code(which 
is doAs part) will be executed only after the user is logged in to HDFS. In 
secured mode user will already be validated using keytab. Above that hdfs needs 
config entries in core-site.xml which is needed for the proxy user 
functionality to work. Please look at the configurations section at 
http://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-common/Superusers.html
 
hdfs in that case would make sure that if the logged in user is not 
authorized to impersonate as some other user based on that config it will throw 
an exception. Again, there is a good chance i might be missing something here. 
Please correct me if i am wrong. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-997: Add proxy user functionality for st...

2015-09-24 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/692#issuecomment-143101710
  
@Parth-Brahmbhatt @ptgoetz Can you please review this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1017: Ignore ignoreZkOffsets in case of ...

2015-09-22 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/754#issuecomment-142448505
  
@harshach Created JIRA item for that. @ptgoetz can you please review this 
as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Storm-1017: Ignore ignoreZkOffsets in case of ...

2015-09-22 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/754

Storm-1017: Ignore ignoreZkOffsets in case of worker crash and restart.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1017

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/754.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #754


commit a89b7ff4b7c96f7998e3a0c673937f9ddefd59c9
Author: Priyank <ps...@hortonworks.com>
Date:   2015-09-15T06:27:01Z

STORM-1017: Ignore ignoreZkOffsets in case of worker crash and restart.

commit 56cfdbf8d9b3b6044d7aa05dfb6b501c24b2f277
Author: Priyank <ps...@hortonworks.com>
Date:   2015-09-22T16:14:26Z

STORM-1017: Ignore ignoreZkOffsets for trident.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1039: Remove commons-codec shading, comm...

2015-09-10 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/730

STORM-1039: Remove commons-codec shading, commons-codec is used by ha…

…doop authentication library which is used during ui authentication in 
secured environment.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-1039

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/730.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #730


commit 5b50bb9ecdccfdc9abb189553f3fe8dc7650be57
Author: Priyank <ps...@hortonworks.com>
Date:   2015-09-10T00:14:00Z

STORM-1039: Remove commons-codec shading, commons-codec is used by hadoop 
authentication library which is used during ui authentication in secured 
environment.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-997: Add proxy user functionality for st...

2015-08-31 Thread priyank5485
Github user priyank5485 commented on the pull request:

https://github.com/apache/storm/pull/692#issuecomment-136490586
  
@harshach thanks for the review. Yes, I will do it. I realized that it 
needs to be added for trident after creating the pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-997: Add proxy user functionality for st...

2015-08-20 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/692

STORM-997: Add proxy user functionality for storm hdfs connector.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-997

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/692.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #692


commit 96903303ccd9087fad27cf4bc4081be1332344f6
Author: Priyank ps...@hortonworks.com
Date:   2015-08-20T07:21:16Z

STORM-997: Add proxy user functionality for storm hdfs connector.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-965: Fix excessive logging

2015-08-14 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/686

STORM-965: Fix excessive logging

The reason for this pull request is that when a non-secure client tries to 
connect to nimbus host in a secure cluster, an exception was being thrown and a 
null transport object was being returned. This null object in turn was being 
used by thrift library class and threw a null pointer exception. Both the 
classes where exception was thrown and caught were logging this exception, 
creating unnecessary lengthy log statements. Disabling logging in storm code by 
changing it from error to debug took care of one part but to work around the 
issue of exception being logged in the thrift jar a NoOpTTrasport.java had to 
be created. Two main things to consider during review
1. Change from Logger.error to Logger.debug in 
KerberosSaslTransportPlugin.java
2. Early return false in SaslTransportPlugin.java which is presumed and 
tested with one nimbus client to work okay. 

Note: TProcess interface process method does not have documentation saying 
what the returned boolean indicates. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm STORM-965

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/686.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #686


commit f29de472a21a9f5936cb13d26cc5a36669c48164
Author: Priyank ps...@hortonworks.com
Date:   2015-08-13T22:42:24Z

STORM-965: Fix excessive logging




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-904: Move bin/storm command line to java...

2015-08-05 Thread priyank5485
Github user priyank5485 commented on a diff in the pull request:

https://github.com/apache/storm/pull/662#discussion_r36324914
  
--- Diff: storm-core/src/jvm/backtype/storm/utils/StormCommandExecutor.java 
---
@@ -0,0 +1,785 @@
+package backtype.storm.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import clojure.lang.IFn;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.SystemUtils;
+
+/**
+ * Created by pshah on 7/17/15.
+ */
+abstract class StormCommandExecutor {
+final String NIMBUS_CLASS = backtype.storm.daemon.nimbus;
+final String SUPERVISOR_CLASS = backtype.storm.daemon.supervisor;
+final String UI_CLASS = backtype.storm.ui.core;
+final String LOGVIEWER_CLASS = backtype.storm.daemon.logviewer;
+final String DRPC_CLASS = backtype.storm.daemon.drpc;
+final String REPL_CLASS = clojure.main;
+final String ACTIVATE_CLASS = backtype.storm.command.activate;
+final String DEACTIVATE_CLASS = backtype.storm.command.deactivate;
+final String REBALANCE_CLASS = backtype.storm.command.rebalance;
+final String LIST_CLASS = backtype.storm.command.list;
+String stormHomeDirectory;
+String userConfDirectory;
+String stormConfDirectory;
+String clusterConfDirectory;
+String stormLibDirectory;
+String stormBinDirectory;
+String stormLog4jConfDirectory;
+String configFile = ;
+String javaCommand;
+ListString configOptions = new ArrayListString();
+String stormExternalClasspath;
+String stormExternalClasspathDaemon;
+String fileSeparator;
+final ListString COMMANDS = Arrays.asList(jar, kill, shell,
+nimbus, ui, logviewer, drpc, supervisor,
+localconfvalue,  remoteconfvalue, repl, classpath,
+activate, deactivate, rebalance, help,  list,
+dev-zookeeper, version, monitor, upload-credentials,
+get-errors);
+
+public static void main (String[] args) {
+for (String arg : args) {
+System.out.println(Argument ++ is  + arg);
+}
+StormCommandExecutor stormCommandExecutor;
+if (System.getProperty(os.name).startsWith(Windows)) {
+stormCommandExecutor = new WindowsStormCommandExecutor();
+} else {
+stormCommandExecutor = new UnixStormCommandExecutor();
+}
+stormCommandExecutor.initialize();
+stormCommandExecutor.execute(args);
+}
+
+StormCommandExecutor () {
+
+}
+
+abstract void initialize ();
+
+abstract void execute (String[] args);
+
+void callMethod (String command, ListString args) {
+Class implementation = this.getClass();
+String methodName = command.replace(-, ) + Command;
+try {
+Method method = implementation.getDeclaredMethod(methodName, 
List
+.class);
+method.invoke(this, args);
+} catch (NoSuchMethodException ex) {
+System.out.println(No such method exception occured while 
trying +
+ to run storm method  + command);
+} catch (IllegalAccessException ex) {
+System.out.println(Illegal access exception occured while 
trying +
+ to run storm method  + command);
+} catch (IllegalArgumentException ex) {
+System.out.println(Illegal argument exception occured while  
+
+trying +  to run storm method  + command);
+} catch (InvocationTargetException ex) {
+System.out.println(Invocation target exception occured while 
 +
+trying +  to run storm method  + command);
+}
+}
+}
+
+class UnixStormCommandExecutor extends StormCommandExecutor {
+
+UnixStormCommandExecutor () {
+
+}
+
+void initialize () {
+Collections.sort(this.COMMANDS);
+this.fileSeparator = System .getProperty (file.separator);
+this.stormHomeDirectory = System.getenv(STORM_BASE_DIR);
+this.userConfDirectory = System.getProperty(user.home) +
+this.fileSeparator +  +
+.storm;
+this.stormConfDirectory = System.getenv(STORM_CONF_DIR);
+this.clusterConfDirectory = this.stormConfDirectory == null ?  
(this
+.stormHomeDirectory + this.fileSeparator

[GitHub] storm pull request: STORM-904: Move bin/storm command line to java...

2015-07-31 Thread priyank5485
GitHub user priyank5485 opened a pull request:

https://github.com/apache/storm/pull/662

STORM-904: Move bin/storm command line to java.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/priyank5485/storm storm-904

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/662.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #662


commit 193f59e027a8ab95d72c8e386d66f0b90f9b86f9
Author: Priyank ps...@hortonworks.com
Date:   2015-07-17T23:45:13Z

Remove python and add a java class to support storm cli.

Add a java class that encapsulates storm cli functionality currently
coded in two different places. storm, storm.py for bash and storm.cmd
for windows. Now both storm and storm.cmd will call the java class
eliminating python code and putting all the code in one place for bash
and windows respectively.

commit 793781cfd55c56f7184910fd744341351f6cc4bf
Author: Priyank ps...@hortonworks.com
Date:   2015-07-20T23:04:48Z

Add apace commons cli library for command line parsing

commit a425fc50ab9b518337d0c095ae80e9d78221e3fd
Author: Priyank ps...@hortonworks.com
Date:   2015-07-21T17:44:30Z

Adding version for dependency in pom file.

commit 5a4ec36ac17af69297dfc73aaa682b8e43edf5de
Author: Priyank ps...@hortonworks.com
Date:   2015-07-21T21:23:54Z

Sample code to test for apache commons cli.

commit 1a8240a558516110a5c14d3a4bb0b177ab76090b
Author: Priyank ps...@hortonworks.com
Date:   2015-07-22T23:40:40Z

Removing apache commons cli, renaming and some intiialization code.

commit b69e295e81216e992187737cc069ff66d2e74687
Author: Priyank ps...@hortonworks.com
Date:   2015-07-23T10:14:48Z

Some redesigning/refactoring.

commit 8c2fbeadbf699b5b286479649d133623c92bc85c
Author: Priyank ps...@hortonworks.com
Date:   2015-07-23T21:49:43Z

Add printUsage mainly.

commit 74f45c7c9d6ae813635c91ba14fdb1cd3dd24c74
Author: Priyank ps...@hortonworks.com
Date:   2015-07-24T18:49:15Z

Call storm command methods using reflection.

commit 10ba25f8c81e2fdc34f4ae3e0e2cdf9ecd142a4a
Author: Priyank ps...@hortonworks.com
Date:   2015-07-24T19:25:01Z

Add method boxes without code for other storm commands.

commit fb2022db8dda7a5183ee5a2cc64a7ef035006c80
Author: Priyank ps...@hortonworks.com
Date:   2015-07-27T17:59:20Z

Initial executeStorm method creating a new process.

Need to test storm daemons if they can be killed using shutdown hooks.

commit 493b0dfeb7be2daa0a54e7dec2f3f716d0af0e26
Author: Priyank ps...@hortonworks.com
Date:   2015-07-29T07:02:49Z

Ship some more code over from python.

commit 3082d4f5ea2d66babe232e5ff49506b322385473
Author: Priyank ps...@hortonworks.com
Date:   2015-07-29T18:49:57Z

Fix an exception which failed to start the nimbus command.

commit c616068d90dd47fa61fbb670c2853805d7ccb324
Author: Priyank ps...@hortonworks.com
Date:   2015-07-29T21:53:27Z

Use exec to replace current process rather than creating a new one.

commit 06f4dcc17d8dcf3c81094557ed97ff117d6410ed
Author: Priyank ps...@hortonworks.com
Date:   2015-07-30T08:37:08Z

Add some commented code.

commit da0ad60438841a1f874c115cef27190a59401f4f
Author: Priyank ps...@hortonworks.com
Date:   2015-07-30T15:45:52Z

Add nil check for strings returned from conclave

commit 0b242adf0bc261e9140c3587b3eedd2141e31718
Author: Priyank ps...@hortonworks.com
Date:   2015-07-30T19:04:35Z

Fix an issue with passing options to storm command,

commit 0bb15782c171d336be905d2f5143865e288a8239
Author: Priyank ps...@hortonworks.com
Date:   2015-07-31T00:37:07Z

STORM-904: Add shutdown hook thread to kill subprocess.

Does not work for kill -9. Need to debug.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >