[jira] [Created] (STORM-2229) KafkaSpout does not resend failed tuples

2016-11-30 Thread Matthias Klein (JIRA)
Matthias Klein created STORM-2229:
-

 Summary: KafkaSpout does not resend failed tuples
 Key: STORM-2229
 URL: https://issues.apache.org/jira/browse/STORM-2229
 Project: Apache Storm
  Issue Type: Bug
  Components: storm-kafka-client
Affects Versions: 1.0.0, 1.0.1, 1.0.2
Reporter: Matthias Klein


When the topology fails a tuple, it is never resent by the KafkaSpout. This can 
easily be shown by constructing a small topology failing every tuple.

Apparent reason:

{code}
public class KafkaSpout extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
final Set retriableTopicPartitions = 
retryService.retriableTopicPartitions();

for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta = 
acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);
// Seek to last committed offset <== Does seek to end of partition
}
}
}
{code}

The code seeks to the end of the partition instead of seeking to the first 
uncommited offset.

Preliminary fix (worked for me, but needs to be checked by an expert)

{code}
private void doSeekRetriableTopicPartitions() {
final Set retriableTopicPartitions = 
retryService.retriableTopicPartitions();

for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta = 
acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
} else {
OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
if(committed == null) {
// No offsets commited yet for this partition - start from 
beginning 
kafkaConsumer.seekToBeginning(toArrayList(rtp));
} else {
   // Seek to first uncommitted offset
kafkaConsumer.seek(rtp, committed.offset() + 1);
}
}
}
}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-2077) KafkaSpout doesn't retry failed tuples

2016-11-30 Thread Matthias Klein (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1573#comment-1573
 ] 

Matthias Klein commented on STORM-2077:
---

By the way: this issue is not assigned to the right component: it should be 
storm-kafka-client. Opening a new Issue.

> KafkaSpout doesn't retry failed tuples
> --
>
> Key: STORM-2077
> URL: https://issues.apache.org/jira/browse/STORM-2077
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 1.0.2
>Reporter: Tobias Maier
>
> KafkaSpout does not retry all failed tuples.
> We used following Configuration:
> Map props = new HashMap<>();
> props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1");
> props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
> props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> broker.bootstrapServer());
> KafkaSpoutStreams kafkaSpoutStreams = new 
> KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new 
> String[]{"test-topic"}).build();
> KafkaSpoutTuplesBuilder kafkaSpoutTuplesBuilder = new 
> KafkaSpoutTuplesBuilder.Builder<>(new 
> KeyValueKafkaSpoutTupleBuilder("test-topic")).build();
> KafkaSpoutRetryService retryService = new 
> KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1),
>  KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3, 
> KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1));
> KafkaSpoutConfig config = new 
> KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder, 
> retryService)
> .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
> .setMaxUncommittedOffsets(30)
> .setOffsetCommitPeriodMs(10)
> .setMaxRetries(3)
> .build();
> kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config);
> The downstream bolt fails every tuple and we expect, that those tuple will 
> all be replayed. But that's not the case for every tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (STORM-2224) Expose a method to override in computing the field from given tuple in FieldSelector

2016-11-30 Thread Satish Duggana (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana resolved STORM-2224.
---
Resolution: Fixed

> Expose a method to override in computing the field from given tuple in 
> FieldSelector
> 
>
> Key: STORM-2224
> URL: https://issues.apache.org/jira/browse/STORM-2224
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-cassandra
>Affects Versions: 2.0.0, 1.x
>Reporter: Satish Duggana
>Assignee: Satish Duggana
> Fix For: 2.0.0, 1.x
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> org.apache.storm.cassandra.query.selector.FieldSelector should give a way to 
> customize computing field value from tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (STORM-2220) Adding config keys for CassandraBolts instead of taking at topology level configuration.

2016-11-30 Thread Satish Duggana (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana resolved STORM-2220.
---
Resolution: Fixed

> Adding config keys for CassandraBolts instead of taking at topology level 
> configuration.
> 
>
> Key: STORM-2220
> URL: https://issues.apache.org/jira/browse/STORM-2220
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-cassandra
>Affects Versions: 2.0.0, 1.x
>Reporter: Satish Duggana
>Assignee: Satish Duggana
> Fix For: 2.0.0, 1.x
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Currently Cassandra bolts takes cassandra cluster configuration fro storm 
> topology configuration. This is restrictive once it has two different 
> cassandra bolts talking to different cassandra endpoints. Give a way to pass 
> cassandra conf to any cassandra bolt. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (STORM-2191) shorten classpaths in worker and LogWriter commands

2016-11-30 Thread Erik Weathers (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709779#comment-15709779
 ] 

Erik Weathers edited comment on STORM-2191 at 11/30/16 9:10 PM:


[~revans2], [~ptgoetz], [~kabhwan]:  I'm working on fixing this issue, but I've 
noticed that in newer (as compared to 0.9.x) storm versions the 
{{get_jars_full}} logic has spread into the [core Storm Java 
code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356]
 now too, and there's [*more* 
uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138]
 of {{get_jars_full}} within {{bin/storm.py}}.   I do not understand the 
purpose for this logic and intend to submit a PR that replaces it with simply 
putting a wildcard in the dir that is being expanded.  Without this change the 
commands get obscenely long (as explained in the Description of this ticket).

Can you please let me know if this would be unacceptable?  I *really* don't 
want to have to forever fork and hack the core storm repo in order to support 
running the storm daemons from a deep directory (as is necessitated when using 
non-dockerized Mesos).

Thanks!!

- Erik


was (Author: erikdw):
[~revans2], [~ptgoetz], [~kabhwan]:  I'm working on fixing this issue, but I've 
noticed that in newer (as compared to 0.9.x) storm versions the 
{{get_jars_full}} logic has spread into the [core Storm Java 
code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356]
 now too, and there's [*more* 
uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138]
 of {{get_jars_full}} within {{bin/storm.py}}.   I do not understand the 
purpose for this logic and intend to submit a PR that replaces it with simply 
putting a wildcard in the dir that is being expanded.  Without this change the 
commands get obscenely long (as explained in the Description of this ticket).

Can you please let me know if this would be unacceptable?  I *really* don't 
want to have to forever fork and hack the core storm repo in order to support 
running the storm jar from a deep directory (as is necessitated when using 
non-dockerized Mesos).

Thanks!!

- Erik

> shorten classpaths in worker and LogWriter commands
> ---
>
> Key: STORM-2191
> URL: https://issues.apache.org/jira/browse/STORM-2191
> Project: Apache Storm
>  Issue Type: Task
>  Components: storm-core
>Affects Versions: 1.0.2
>Reporter: Erik Weathers
>Priority: Minor
>  Labels: cli, command-line
>
> When launching the worker daemon and its wrapping LogWriter daemon, the 
> commands can become so long that they eclipse the default Linux limit of 4096 
> bytes. That results in commands that are cut off in {{ps}} output, and 
> prevents easily inspecting the system to see even what processes are running.
> The specific scenario in which this problem can be easily triggered: *running 
> Storm on Mesos*.
> h5. Details on why it happens:
> # using the default Mesos containerizer instead of Docker containers, which 
> causes the storm-mesos package to be unpacked into the Mesos executor sandbox.
> # The ["expand all jars on 
> classpath"|https://github.com/apache/storm/blob/6dc6407a01d032483edebb1c1b4d8b69a304d81c/bin/storm.py#L114-L140]
>  functionality in the {{bin/storm.py}} script causes every one of the jars 
> that storm bundles into its lib directory to be explicitly listed in the 
> command.
> #* e.g., say the mesos work dir is {{/var/run/mesos/work_dir/}}
> #* and say that the original classpath argument in the supervisor cmd 
> includes the following for the {{lib/}} dir in the binary storm package:
> #** 
> {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/*}}
> #* That leads to a hugely expanded classpath argument for the LogWriter and 
> Worker daemons that get launched:
> #** 
> {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/asm-4.0.jar:/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/carbonite-1.4.0.jar:...}}



--
This message was sent by Atlassian JIRA
(v6.3.4#

[jira] [Comment Edited] (STORM-2191) shorten classpaths in worker and LogWriter commands

2016-11-30 Thread Erik Weathers (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709779#comment-15709779
 ] 

Erik Weathers edited comment on STORM-2191 at 11/30/16 9:08 PM:


[~revans2], [~ptgoetz], [~kabhwan]:  I'm working on fixing this issue, but I've 
noticed that in newer (as compared to 0.9.x) storm versions the 
{{get_jars_full}} logic has spread into the [core Storm Java 
code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356]
 now too, and there's [*more* 
uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138]
 of {{get_jars_full}} within {{bin/storm.py}}.   I do not understand the 
purpose for this logic and intend to submit a PR that replaces it with simply 
putting a wildcard in the dir that is being expanded.  Without this change the 
commands get obscenely long (as explained in the Description of this ticket).

Can you please let me know if this would be unacceptable?  I *really* don't 
want to have to forever fork and hack the core storm repo in order to support 
running the storm jar from a deep directory (as is necessitated when using 
non-dockerized Mesos).

Thanks!!

- Erik


was (Author: erikdw):
[~revans2], [~ptgoetz], [~kabhwan]:  I'm working on fixing this issue, but I've 
noticed that in newer (as compared to 0.9.x) storm versions the 
{{get_jars_full}} logic has spread into the [core Storm Java 
code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356]
 now too, and there's [*more* 
uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138]
 of {{get_jars_full}} within {{bin/storm.py}}.   I do not understand the 
purpose for this logic and intend to submit a PR that replaces it with simply 
putting a wildcard in the dir that is being expanded.  Without this the 
commands get obscenely long (as explained in the Description of this ticket).

Can you please let me know if this would be unacceptable?  I *really* don't 
want to have to forever fork and hack the core storm repo in order to support 
running the storm jar from a deep directory (as is necessitated when using 
non-dockerized Mesos).

Thanks!!

- Erik

> shorten classpaths in worker and LogWriter commands
> ---
>
> Key: STORM-2191
> URL: https://issues.apache.org/jira/browse/STORM-2191
> Project: Apache Storm
>  Issue Type: Task
>  Components: storm-core
>Affects Versions: 1.0.2
>Reporter: Erik Weathers
>Priority: Minor
>  Labels: cli, command-line
>
> When launching the worker daemon and its wrapping LogWriter daemon, the 
> commands can become so long that they eclipse the default Linux limit of 4096 
> bytes. That results in commands that are cut off in {{ps}} output, and 
> prevents easily inspecting the system to see even what processes are running.
> The specific scenario in which this problem can be easily triggered: *running 
> Storm on Mesos*.
> h5. Details on why it happens:
> # using the default Mesos containerizer instead of Docker containers, which 
> causes the storm-mesos package to be unpacked into the Mesos executor sandbox.
> # The ["expand all jars on 
> classpath"|https://github.com/apache/storm/blob/6dc6407a01d032483edebb1c1b4d8b69a304d81c/bin/storm.py#L114-L140]
>  functionality in the {{bin/storm.py}} script causes every one of the jars 
> that storm bundles into its lib directory to be explicitly listed in the 
> command.
> #* e.g., say the mesos work dir is {{/var/run/mesos/work_dir/}}
> #* and say that the original classpath argument in the supervisor cmd 
> includes the following for the {{lib/}} dir in the binary storm package:
> #** 
> {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/*}}
> #* That leads to a hugely expanded classpath argument for the LogWriter and 
> Worker daemons that get launched:
> #** 
> {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/asm-4.0.jar:/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/carbonite-1.4.0.jar:...}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-2191) shorten classpaths in worker and LogWriter commands

2016-11-30 Thread Erik Weathers (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709779#comment-15709779
 ] 

Erik Weathers commented on STORM-2191:
--

[~revans2], [~ptgoetz], [~kabhwan]:  I'm working on fixing this issue, but I've 
noticed that in newer (as compared to 0.9.x) storm versions the 
{{get_jars_full}} logic has spread into the [core Storm Java 
code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356]
 now too, and there's [*more* 
uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138]
 of {{get_jars_full}} within {{bin/storm.py}}.   I do not understand the 
purpose for this logic and intend to submit a PR that replaces it with simply 
putting a wildcard in the dir that is being expanded.  Without this the 
commands get obscenely long (as explained in the Description of this ticket).

Can you please let me know if this would be unacceptable?  I *really* don't 
want to have to forever fork and hack the core storm repo in order to support 
running the storm jar from a deep directly (as is necessitated when using 
non-dockerized Mesos).

Thanks!!

- Erik

> shorten classpaths in worker and LogWriter commands
> ---
>
> Key: STORM-2191
> URL: https://issues.apache.org/jira/browse/STORM-2191
> Project: Apache Storm
>  Issue Type: Task
>  Components: storm-core
>Affects Versions: 1.0.2
>Reporter: Erik Weathers
>Priority: Minor
>  Labels: cli, command-line
>
> When launching the worker daemon and its wrapping LogWriter daemon, the 
> commands can become so long that they eclipse the default Linux limit of 4096 
> bytes. That results in commands that are cut off in {{ps}} output, and 
> prevents easily inspecting the system to see even what processes are running.
> The specific scenario in which this problem can be easily triggered: *running 
> Storm on Mesos*.
> h5. Details on why it happens:
> # using the default Mesos containerizer instead of Docker containers, which 
> causes the storm-mesos package to be unpacked into the Mesos executor sandbox.
> # The ["expand all jars on 
> classpath"|https://github.com/apache/storm/blob/6dc6407a01d032483edebb1c1b4d8b69a304d81c/bin/storm.py#L114-L140]
>  functionality in the {{bin/storm.py}} script causes every one of the jars 
> that storm bundles into its lib directory to be explicitly listed in the 
> command.
> #* e.g., say the mesos work dir is {{/var/run/mesos/work_dir/}}
> #* and say that the original classpath argument in the supervisor cmd 
> includes the following for the {{lib/}} dir in the binary storm package:
> #** 
> {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/*}}
> #* That leads to a hugely expanded classpath argument for the LogWriter and 
> Worker daemons that get launched:
> #** 
> {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/asm-4.0.jar:/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/carbonite-1.4.0.jar:...}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (STORM-2191) shorten classpaths in worker and LogWriter commands

2016-11-30 Thread Erik Weathers (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709779#comment-15709779
 ] 

Erik Weathers edited comment on STORM-2191 at 11/30/16 9:07 PM:


[~revans2], [~ptgoetz], [~kabhwan]:  I'm working on fixing this issue, but I've 
noticed that in newer (as compared to 0.9.x) storm versions the 
{{get_jars_full}} logic has spread into the [core Storm Java 
code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356]
 now too, and there's [*more* 
uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138]
 of {{get_jars_full}} within {{bin/storm.py}}.   I do not understand the 
purpose for this logic and intend to submit a PR that replaces it with simply 
putting a wildcard in the dir that is being expanded.  Without this the 
commands get obscenely long (as explained in the Description of this ticket).

Can you please let me know if this would be unacceptable?  I *really* don't 
want to have to forever fork and hack the core storm repo in order to support 
running the storm jar from a deep directory (as is necessitated when using 
non-dockerized Mesos).

Thanks!!

- Erik


was (Author: erikdw):
[~revans2], [~ptgoetz], [~kabhwan]:  I'm working on fixing this issue, but I've 
noticed that in newer (as compared to 0.9.x) storm versions the 
{{get_jars_full}} logic has spread into the [core Storm Java 
code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356]
 now too, and there's [*more* 
uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138]
 of {{get_jars_full}} within {{bin/storm.py}}.   I do not understand the 
purpose for this logic and intend to submit a PR that replaces it with simply 
putting a wildcard in the dir that is being expanded.  Without this the 
commands get obscenely long (as explained in the Description of this ticket).

Can you please let me know if this would be unacceptable?  I *really* don't 
want to have to forever fork and hack the core storm repo in order to support 
running the storm jar from a deep directly (as is necessitated when using 
non-dockerized Mesos).

Thanks!!

- Erik

> shorten classpaths in worker and LogWriter commands
> ---
>
> Key: STORM-2191
> URL: https://issues.apache.org/jira/browse/STORM-2191
> Project: Apache Storm
>  Issue Type: Task
>  Components: storm-core
>Affects Versions: 1.0.2
>Reporter: Erik Weathers
>Priority: Minor
>  Labels: cli, command-line
>
> When launching the worker daemon and its wrapping LogWriter daemon, the 
> commands can become so long that they eclipse the default Linux limit of 4096 
> bytes. That results in commands that are cut off in {{ps}} output, and 
> prevents easily inspecting the system to see even what processes are running.
> The specific scenario in which this problem can be easily triggered: *running 
> Storm on Mesos*.
> h5. Details on why it happens:
> # using the default Mesos containerizer instead of Docker containers, which 
> causes the storm-mesos package to be unpacked into the Mesos executor sandbox.
> # The ["expand all jars on 
> classpath"|https://github.com/apache/storm/blob/6dc6407a01d032483edebb1c1b4d8b69a304d81c/bin/storm.py#L114-L140]
>  functionality in the {{bin/storm.py}} script causes every one of the jars 
> that storm bundles into its lib directory to be explicitly listed in the 
> command.
> #* e.g., say the mesos work dir is {{/var/run/mesos/work_dir/}}
> #* and say that the original classpath argument in the supervisor cmd 
> includes the following for the {{lib/}} dir in the binary storm package:
> #** 
> {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/*}}
> #* That leads to a hugely expanded classpath argument for the LogWriter and 
> Worker daemons that get launched:
> #** 
> {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/asm-4.0.jar:/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/carbonite-1.4.0.jar:...}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-2228) KafkaSpout does not replay properly when a topic maps to multiple streams

2016-11-30 Thread Robert Joseph Evans (JIRA)
Robert Joseph Evans created STORM-2228:
--

 Summary: KafkaSpout does not replay properly when a topic maps to 
multiple streams
 Key: STORM-2228
 URL: https://issues.apache.org/jira/browse/STORM-2228
 Project: Apache Storm
  Issue Type: Bug
  Components: storm-kafka-client
Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3
Reporter: Robert Joseph Evans
Assignee: Robert Joseph Evans
Priority: Blocker


In the example.

KafkaSpoutTopologyMainNamedTopics.java

The code creates a TuplesBuilder and a KafkaSpoutStreams

{code}
protected KafkaSpoutTuplesBuilder getTuplesBuilder() {
return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
new TopicsTest0Test1TupleBuilder(TOPICS[0], 
TOPICS[1]),
new TopicTest2TupleBuilder(TOPICS[2]))
.build();
}

protected KafkaSpoutStreams getKafkaSpoutStreams() {
final Fields outputFields = new Fields("topic", "partition", "offset", 
"key", "value");
final Fields outputFields1 = new Fields("topic", "partition", "offset");
return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], 
new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent to 
test_stream
.addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // 
contents of topic test2 sent to test_stream
.addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // 
contents of topic test2 sent to test2_stream
.build();
}
{code}

Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and 
{{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", 
"key", "value")}} and output them on {{STREAMS\[0]}}. Then just for 
{{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", 
"offset")}} to {{STREAMS\[2]}}.  (Don't know what happened to {{STREAMS\[1]}})

There are two issues here.  First with how the TupleBuilder and the 
SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting the 
full "topic" "partition" "offset" "key" "value", but this minor.  The real 
issue is that the code uses the same KafkaSpoutMessageId for all the tuples 
emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}.

https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304

The code, however, is written to assume that it will only ever get one ack/fail 
for a given KafkaSpoutMessageId.  This means that if one of the emitted tuple 
trees succeed and then the other fails, the failure will not result in anything 
being replayed!  This violates how storm is intended to work.

I discovered this as a part of STORM-2225, and I am fine with fixing it on 
STORM-2225 (I would just remove support for that functionality because there 
are other ways of doing this correctly).  But that would not maintain backwards 
compatibility and I am not sure it would be appropriate for 1.x releases.  I 
really would like to have feedback from others on this.

I can put something into 1.x where it will throw an exception if acking is 
enabled and this situation is present, but I don't want to spend the time tying 
to do reference counting on the number of tuples actually emitted.  If someone 
else wants to do that I would be happy to turn this JIRA over to them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (STORM-2227) Better User Control of GC Options

2016-11-30 Thread Derek Dagit (JIRA)
Derek Dagit created STORM-2227:
--

 Summary: Better User Control of GC Options
 Key: STORM-2227
 URL: https://issues.apache.org/jira/browse/STORM-2227
 Project: Apache Storm
  Issue Type: Improvement
  Components: storm-core
Affects Versions: 0.10.3
Reporter: Derek Dagit
Priority: Minor


As a user, I would like to override certain JVM garbage connection options 
instead of overriding them all, so that I can make simpler, safer changes.

Currently, if a user wants to add some gc option in a topology, the user must 
copy everything from {{worker.gc.childopts}} to 
{{topology.worker.gc.childopts}} and make needed edits/additions.  This is 
error prone, since the provided cluster-wide options can change, and because 
they are overwritten by default.

A user can easily override settings unwittingly by adding new options if they 
forget to also copy the cluster-wide settings.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (STORM-2174) Initial Scaffolding for a Beam Pipeline Runner

2016-11-30 Thread P. Taylor Goetz (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

P. Taylor Goetz resolved STORM-2174.

Resolution: Fixed

Merged to beam-runner feature branch.

> Initial Scaffolding for a Beam Pipeline Runner
> --
>
> Key: STORM-2174
> URL: https://issues.apache.org/jira/browse/STORM-2174
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: P. Taylor Goetz
>Assignee: P. Taylor Goetz
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Create a module under external with an implementation of the basic 
> scaffolding, etc. for a beam runner.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-1886) Extend KeyValueState interface with delete method

2016-11-30 Thread Arun Mahadevan (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arun Mahadevan updated STORM-1886:
--
Assignee: Balazs Kossovics

> Extend KeyValueState interface with delete method
> -
>
> Key: STORM-1886
> URL: https://issues.apache.org/jira/browse/STORM-1886
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Balazs Kossovics
>Assignee: Balazs Kossovics
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Even if the implementation of checkpointing only uses the get/put methods of 
> the KeyValueState interface, the existance of a delete method could be really 
> useful in the general case.
> I made a first implementation, what do you think about?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (STORM-2077) KafkaSpout doesn't retry failed tuples

2016-11-30 Thread Matthias Klein (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708652#comment-15708652
 ] 

Matthias Klein edited comment on STORM-2077 at 11/30/16 1:54 PM:
-

The problem seems to be the following piece of code:

{code}
public class KafkaSpout extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
//...
   if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to last 
committed offset <-- Indeed seeks to end of partition
}
{code}

However, this code seeks to the end of the partition, not to the last committed 
offset. The following change seems to repair the issue:

{code}
   if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
} else {
OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
if(committed == null) { 
   // Nothing yet committed
kafkaConsumer.seekToBeginning(toArrayList(rtp));
} else {
// seek to entry after last committed offset
kafkaConsumer.seek(rtp, committed.offset() + 1);
}
{code}


was (Author: db3f):
The problem seems to be the following piece of code:

{code}
public class KafkaSpout extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
//...
   if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to last 
committed offset <-- Ideed seeks to end of partition
}
{code}

However, this code seeks to the end of the partition, not to the last committed 
offset. The following change seems to repair the issue:

{code}
   if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
} else {
OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
if(committed == null) { // Nothing yet committed
kafkaConsumer.seekToBeginning(toArrayList(rtp));
} else { // seek to entry after last committed offset
kafkaConsumer.seek(rtp, committed.offset() + 1);
}
{code}

> KafkaSpout doesn't retry failed tuples
> --
>
> Key: STORM-2077
> URL: https://issues.apache.org/jira/browse/STORM-2077
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 1.0.2
>Reporter: Tobias Maier
>
> KafkaSpout does not retry all failed tuples.
> We used following Configuration:
> Map props = new HashMap<>();
> props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1");
> props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
> props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> broker.bootstrapServer());
> KafkaSpoutStreams kafkaSpoutStreams = new 
> KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new 
> String[]{"test-topic"}).build();
> KafkaSpoutTuplesBuilder kafkaSpoutTuplesBuilder = new 
> KafkaSpoutTuplesBuilder.Builder<>(new 
> KeyValueKafkaSpoutTupleBuilder("test-topic")).build();
> KafkaSpoutRetryService retryService = new 
> KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1),
>  KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3, 
> KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1));
> KafkaSpoutConfig config = new 
> KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder, 
> retryService)
> .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
> .setMaxUncommittedOffsets(30)
> .setOffsetCommitPeriodMs(10)
> .setMaxRetries(3)
> .build();
> kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config);
> The downstream bolt fails every tuple and we expect, that those tuple will 
> all be replayed. But that's not the case for every tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-2077) KafkaSpout doesn't retry failed tuples

2016-11-30 Thread Matthias Klein (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708652#comment-15708652
 ] 

Matthias Klein commented on STORM-2077:
---

The problem seems to be the following piece of code:

{code}
public class KafkaSpout extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
//...
   if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to last 
committed offset <-- Ideed seeks to end of partition
}
{code}

However, this code seeks to the end of the partition, not to the last committed 
offset. The following change seems to repair the issue:

{code}
   if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
} else {
OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
if(committed == null) { // Nothing yet committed
kafkaConsumer.seekToBeginning(toArrayList(rtp));
} else { // seek to entry after last committed offset
kafkaConsumer.seek(rtp, committed.offset() + 1);
}
{code}

> KafkaSpout doesn't retry failed tuples
> --
>
> Key: STORM-2077
> URL: https://issues.apache.org/jira/browse/STORM-2077
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 1.0.2
>Reporter: Tobias Maier
>
> KafkaSpout does not retry all failed tuples.
> We used following Configuration:
> Map props = new HashMap<>();
> props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1");
> props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
> props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> broker.bootstrapServer());
> KafkaSpoutStreams kafkaSpoutStreams = new 
> KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new 
> String[]{"test-topic"}).build();
> KafkaSpoutTuplesBuilder kafkaSpoutTuplesBuilder = new 
> KafkaSpoutTuplesBuilder.Builder<>(new 
> KeyValueKafkaSpoutTupleBuilder("test-topic")).build();
> KafkaSpoutRetryService retryService = new 
> KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1),
>  KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3, 
> KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1));
> KafkaSpoutConfig config = new 
> KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder, 
> retryService)
> .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
> .setMaxUncommittedOffsets(30)
> .setOffsetCommitPeriodMs(10)
> .setMaxRetries(3)
> .build();
> kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config);
> The downstream bolt fails every tuple and we expect, that those tuple will 
> all be replayed. But that's not the case for every tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-2087) Storm-kafka-client: Failed tuples are not always replayed

2016-11-30 Thread Andreas Maier (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708199#comment-15708199
 ] 

Andreas Maier commented on STORM-2087:
--

Is this a duplicate of STORM-2077 ?

> Storm-kafka-client: Failed tuples are not always replayed 
> --
>
> Key: STORM-2087
> URL: https://issues.apache.org/jira/browse/STORM-2087
> Project: Apache Storm
>  Issue Type: Bug
>Reporter: Jeff Fenchel
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> I am working with kafka 10 and the storm-kafka-client from master. It appears 
> that tuples are not always being replayed when they are failed.
> With a topology that randomly fails tuples a small percentage of the time I 
> found that the committed kafka offset would get stuck and eventually 
> processing would stop even though the committed offset was no where near the 
> end of the topic. 
> I have also replicated the issue in unit tests with this PR: 
> https://github.com/apache/storm/pull/1679
> It seems that increasing the number of times I call nextTuple for the in 
> order case will make it work, but it doesn't seem to help the case where 
> tuples are failed out of order from which they were emitted. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)