[jira] [Created] (STORM-2229) KafkaSpout does not resend failed tuples
Matthias Klein created STORM-2229: - Summary: KafkaSpout does not resend failed tuples Key: STORM-2229 URL: https://issues.apache.org/jira/browse/STORM-2229 Project: Apache Storm Issue Type: Bug Components: storm-kafka-client Affects Versions: 1.0.0, 1.0.1, 1.0.2 Reporter: Matthias Klein When the topology fails a tuple, it is never resent by the KafkaSpout. This can easily be shown by constructing a small topology failing every tuple. Apparent reason: {code} public class KafkaSpout extends BaseRichSpout { //... private void doSeekRetriableTopicPartitions() { final Set retriableTopicPartitions = retryService.retriableTopicPartitions(); for (TopicPartition rtp : retriableTopicPartitions) { final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset <== Does seek to end of partition } } } {code} The code seeks to the end of the partition instead of seeking to the first uncommited offset. Preliminary fix (worked for me, but needs to be checked by an expert) {code} private void doSeekRetriableTopicPartitions() { final Set retriableTopicPartitions = retryService.retriableTopicPartitions(); for (TopicPartition rtp : retriableTopicPartitions) { final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { OffsetAndMetadata committed = kafkaConsumer.committed(rtp); if(committed == null) { // No offsets commited yet for this partition - start from beginning kafkaConsumer.seekToBeginning(toArrayList(rtp)); } else { // Seek to first uncommitted offset kafkaConsumer.seek(rtp, committed.offset() + 1); } } } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-2077) KafkaSpout doesn't retry failed tuples
[ https://issues.apache.org/jira/browse/STORM-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1573#comment-1573 ] Matthias Klein commented on STORM-2077: --- By the way: this issue is not assigned to the right component: it should be storm-kafka-client. Opening a new Issue. > KafkaSpout doesn't retry failed tuples > -- > > Key: STORM-2077 > URL: https://issues.apache.org/jira/browse/STORM-2077 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka >Affects Versions: 1.0.2 >Reporter: Tobias Maier > > KafkaSpout does not retry all failed tuples. > We used following Configuration: > Map props = new HashMap<>(); > props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1"); > props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, > ByteArrayDeserializer.class.getName()); > props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, > ByteArrayDeserializer.class.getName()); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > broker.bootstrapServer()); > KafkaSpoutStreams kafkaSpoutStreams = new > KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new > String[]{"test-topic"}).build(); > KafkaSpoutTuplesBuilder kafkaSpoutTuplesBuilder = new > KafkaSpoutTuplesBuilder.Builder<>(new > KeyValueKafkaSpoutTupleBuilder("test-topic")).build(); > KafkaSpoutRetryService retryService = new > KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), > KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3, > KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1)); > KafkaSpoutConfig config = new > KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder, > retryService) > .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST) > .setMaxUncommittedOffsets(30) > .setOffsetCommitPeriodMs(10) > .setMaxRetries(3) > .build(); > kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config); > The downstream bolt fails every tuple and we expect, that those tuple will > all be replayed. But that's not the case for every tuple. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (STORM-2224) Expose a method to override in computing the field from given tuple in FieldSelector
[ https://issues.apache.org/jira/browse/STORM-2224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana resolved STORM-2224. --- Resolution: Fixed > Expose a method to override in computing the field from given tuple in > FieldSelector > > > Key: STORM-2224 > URL: https://issues.apache.org/jira/browse/STORM-2224 > Project: Apache Storm > Issue Type: Improvement > Components: storm-cassandra >Affects Versions: 2.0.0, 1.x >Reporter: Satish Duggana >Assignee: Satish Duggana > Fix For: 2.0.0, 1.x > > Time Spent: 1h > Remaining Estimate: 0h > > org.apache.storm.cassandra.query.selector.FieldSelector should give a way to > customize computing field value from tuple. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (STORM-2220) Adding config keys for CassandraBolts instead of taking at topology level configuration.
[ https://issues.apache.org/jira/browse/STORM-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana resolved STORM-2220. --- Resolution: Fixed > Adding config keys for CassandraBolts instead of taking at topology level > configuration. > > > Key: STORM-2220 > URL: https://issues.apache.org/jira/browse/STORM-2220 > Project: Apache Storm > Issue Type: Improvement > Components: storm-cassandra >Affects Versions: 2.0.0, 1.x >Reporter: Satish Duggana >Assignee: Satish Duggana > Fix For: 2.0.0, 1.x > > Time Spent: 1.5h > Remaining Estimate: 0h > > Currently Cassandra bolts takes cassandra cluster configuration fro storm > topology configuration. This is restrictive once it has two different > cassandra bolts talking to different cassandra endpoints. Give a way to pass > cassandra conf to any cassandra bolt. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (STORM-2191) shorten classpaths in worker and LogWriter commands
[ https://issues.apache.org/jira/browse/STORM-2191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709779#comment-15709779 ] Erik Weathers edited comment on STORM-2191 at 11/30/16 9:10 PM: [~revans2], [~ptgoetz], [~kabhwan]: I'm working on fixing this issue, but I've noticed that in newer (as compared to 0.9.x) storm versions the {{get_jars_full}} logic has spread into the [core Storm Java code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356] now too, and there's [*more* uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138] of {{get_jars_full}} within {{bin/storm.py}}. I do not understand the purpose for this logic and intend to submit a PR that replaces it with simply putting a wildcard in the dir that is being expanded. Without this change the commands get obscenely long (as explained in the Description of this ticket). Can you please let me know if this would be unacceptable? I *really* don't want to have to forever fork and hack the core storm repo in order to support running the storm daemons from a deep directory (as is necessitated when using non-dockerized Mesos). Thanks!! - Erik was (Author: erikdw): [~revans2], [~ptgoetz], [~kabhwan]: I'm working on fixing this issue, but I've noticed that in newer (as compared to 0.9.x) storm versions the {{get_jars_full}} logic has spread into the [core Storm Java code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356] now too, and there's [*more* uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138] of {{get_jars_full}} within {{bin/storm.py}}. I do not understand the purpose for this logic and intend to submit a PR that replaces it with simply putting a wildcard in the dir that is being expanded. Without this change the commands get obscenely long (as explained in the Description of this ticket). Can you please let me know if this would be unacceptable? I *really* don't want to have to forever fork and hack the core storm repo in order to support running the storm jar from a deep directory (as is necessitated when using non-dockerized Mesos). Thanks!! - Erik > shorten classpaths in worker and LogWriter commands > --- > > Key: STORM-2191 > URL: https://issues.apache.org/jira/browse/STORM-2191 > Project: Apache Storm > Issue Type: Task > Components: storm-core >Affects Versions: 1.0.2 >Reporter: Erik Weathers >Priority: Minor > Labels: cli, command-line > > When launching the worker daemon and its wrapping LogWriter daemon, the > commands can become so long that they eclipse the default Linux limit of 4096 > bytes. That results in commands that are cut off in {{ps}} output, and > prevents easily inspecting the system to see even what processes are running. > The specific scenario in which this problem can be easily triggered: *running > Storm on Mesos*. > h5. Details on why it happens: > # using the default Mesos containerizer instead of Docker containers, which > causes the storm-mesos package to be unpacked into the Mesos executor sandbox. > # The ["expand all jars on > classpath"|https://github.com/apache/storm/blob/6dc6407a01d032483edebb1c1b4d8b69a304d81c/bin/storm.py#L114-L140] > functionality in the {{bin/storm.py}} script causes every one of the jars > that storm bundles into its lib directory to be explicitly listed in the > command. > #* e.g., say the mesos work dir is {{/var/run/mesos/work_dir/}} > #* and say that the original classpath argument in the supervisor cmd > includes the following for the {{lib/}} dir in the binary storm package: > #** > {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/*}} > #* That leads to a hugely expanded classpath argument for the LogWriter and > Worker daemons that get launched: > #** > {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/asm-4.0.jar:/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/carbonite-1.4.0.jar:...}} -- This message was sent by Atlassian JIRA (v6.3.4#
[jira] [Comment Edited] (STORM-2191) shorten classpaths in worker and LogWriter commands
[ https://issues.apache.org/jira/browse/STORM-2191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709779#comment-15709779 ] Erik Weathers edited comment on STORM-2191 at 11/30/16 9:08 PM: [~revans2], [~ptgoetz], [~kabhwan]: I'm working on fixing this issue, but I've noticed that in newer (as compared to 0.9.x) storm versions the {{get_jars_full}} logic has spread into the [core Storm Java code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356] now too, and there's [*more* uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138] of {{get_jars_full}} within {{bin/storm.py}}. I do not understand the purpose for this logic and intend to submit a PR that replaces it with simply putting a wildcard in the dir that is being expanded. Without this change the commands get obscenely long (as explained in the Description of this ticket). Can you please let me know if this would be unacceptable? I *really* don't want to have to forever fork and hack the core storm repo in order to support running the storm jar from a deep directory (as is necessitated when using non-dockerized Mesos). Thanks!! - Erik was (Author: erikdw): [~revans2], [~ptgoetz], [~kabhwan]: I'm working on fixing this issue, but I've noticed that in newer (as compared to 0.9.x) storm versions the {{get_jars_full}} logic has spread into the [core Storm Java code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356] now too, and there's [*more* uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138] of {{get_jars_full}} within {{bin/storm.py}}. I do not understand the purpose for this logic and intend to submit a PR that replaces it with simply putting a wildcard in the dir that is being expanded. Without this the commands get obscenely long (as explained in the Description of this ticket). Can you please let me know if this would be unacceptable? I *really* don't want to have to forever fork and hack the core storm repo in order to support running the storm jar from a deep directory (as is necessitated when using non-dockerized Mesos). Thanks!! - Erik > shorten classpaths in worker and LogWriter commands > --- > > Key: STORM-2191 > URL: https://issues.apache.org/jira/browse/STORM-2191 > Project: Apache Storm > Issue Type: Task > Components: storm-core >Affects Versions: 1.0.2 >Reporter: Erik Weathers >Priority: Minor > Labels: cli, command-line > > When launching the worker daemon and its wrapping LogWriter daemon, the > commands can become so long that they eclipse the default Linux limit of 4096 > bytes. That results in commands that are cut off in {{ps}} output, and > prevents easily inspecting the system to see even what processes are running. > The specific scenario in which this problem can be easily triggered: *running > Storm on Mesos*. > h5. Details on why it happens: > # using the default Mesos containerizer instead of Docker containers, which > causes the storm-mesos package to be unpacked into the Mesos executor sandbox. > # The ["expand all jars on > classpath"|https://github.com/apache/storm/blob/6dc6407a01d032483edebb1c1b4d8b69a304d81c/bin/storm.py#L114-L140] > functionality in the {{bin/storm.py}} script causes every one of the jars > that storm bundles into its lib directory to be explicitly listed in the > command. > #* e.g., say the mesos work dir is {{/var/run/mesos/work_dir/}} > #* and say that the original classpath argument in the supervisor cmd > includes the following for the {{lib/}} dir in the binary storm package: > #** > {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/*}} > #* That leads to a hugely expanded classpath argument for the LogWriter and > Worker daemons that get launched: > #** > {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/asm-4.0.jar:/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/carbonite-1.4.0.jar:...}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-2191) shorten classpaths in worker and LogWriter commands
[ https://issues.apache.org/jira/browse/STORM-2191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709779#comment-15709779 ] Erik Weathers commented on STORM-2191: -- [~revans2], [~ptgoetz], [~kabhwan]: I'm working on fixing this issue, but I've noticed that in newer (as compared to 0.9.x) storm versions the {{get_jars_full}} logic has spread into the [core Storm Java code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356] now too, and there's [*more* uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138] of {{get_jars_full}} within {{bin/storm.py}}. I do not understand the purpose for this logic and intend to submit a PR that replaces it with simply putting a wildcard in the dir that is being expanded. Without this the commands get obscenely long (as explained in the Description of this ticket). Can you please let me know if this would be unacceptable? I *really* don't want to have to forever fork and hack the core storm repo in order to support running the storm jar from a deep directly (as is necessitated when using non-dockerized Mesos). Thanks!! - Erik > shorten classpaths in worker and LogWriter commands > --- > > Key: STORM-2191 > URL: https://issues.apache.org/jira/browse/STORM-2191 > Project: Apache Storm > Issue Type: Task > Components: storm-core >Affects Versions: 1.0.2 >Reporter: Erik Weathers >Priority: Minor > Labels: cli, command-line > > When launching the worker daemon and its wrapping LogWriter daemon, the > commands can become so long that they eclipse the default Linux limit of 4096 > bytes. That results in commands that are cut off in {{ps}} output, and > prevents easily inspecting the system to see even what processes are running. > The specific scenario in which this problem can be easily triggered: *running > Storm on Mesos*. > h5. Details on why it happens: > # using the default Mesos containerizer instead of Docker containers, which > causes the storm-mesos package to be unpacked into the Mesos executor sandbox. > # The ["expand all jars on > classpath"|https://github.com/apache/storm/blob/6dc6407a01d032483edebb1c1b4d8b69a304d81c/bin/storm.py#L114-L140] > functionality in the {{bin/storm.py}} script causes every one of the jars > that storm bundles into its lib directory to be explicitly listed in the > command. > #* e.g., say the mesos work dir is {{/var/run/mesos/work_dir/}} > #* and say that the original classpath argument in the supervisor cmd > includes the following for the {{lib/}} dir in the binary storm package: > #** > {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/*}} > #* That leads to a hugely expanded classpath argument for the LogWriter and > Worker daemons that get launched: > #** > {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/asm-4.0.jar:/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/carbonite-1.4.0.jar:...}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (STORM-2191) shorten classpaths in worker and LogWriter commands
[ https://issues.apache.org/jira/browse/STORM-2191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709779#comment-15709779 ] Erik Weathers edited comment on STORM-2191 at 11/30/16 9:07 PM: [~revans2], [~ptgoetz], [~kabhwan]: I'm working on fixing this issue, but I've noticed that in newer (as compared to 0.9.x) storm versions the {{get_jars_full}} logic has spread into the [core Storm Java code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356] now too, and there's [*more* uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138] of {{get_jars_full}} within {{bin/storm.py}}. I do not understand the purpose for this logic and intend to submit a PR that replaces it with simply putting a wildcard in the dir that is being expanded. Without this the commands get obscenely long (as explained in the Description of this ticket). Can you please let me know if this would be unacceptable? I *really* don't want to have to forever fork and hack the core storm repo in order to support running the storm jar from a deep directory (as is necessitated when using non-dockerized Mesos). Thanks!! - Erik was (Author: erikdw): [~revans2], [~ptgoetz], [~kabhwan]: I'm working on fixing this issue, but I've noticed that in newer (as compared to 0.9.x) storm versions the {{get_jars_full}} logic has spread into the [core Storm Java code|https://github.com/apache/storm/blob/master/storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java#L330-L356] now too, and there's [*more* uses|https://github.com/apache/storm/blob/285668742742da6316d38c1ef492c7c873b5a649/bin/storm.py#L131-L138] of {{get_jars_full}} within {{bin/storm.py}}. I do not understand the purpose for this logic and intend to submit a PR that replaces it with simply putting a wildcard in the dir that is being expanded. Without this the commands get obscenely long (as explained in the Description of this ticket). Can you please let me know if this would be unacceptable? I *really* don't want to have to forever fork and hack the core storm repo in order to support running the storm jar from a deep directly (as is necessitated when using non-dockerized Mesos). Thanks!! - Erik > shorten classpaths in worker and LogWriter commands > --- > > Key: STORM-2191 > URL: https://issues.apache.org/jira/browse/STORM-2191 > Project: Apache Storm > Issue Type: Task > Components: storm-core >Affects Versions: 1.0.2 >Reporter: Erik Weathers >Priority: Minor > Labels: cli, command-line > > When launching the worker daemon and its wrapping LogWriter daemon, the > commands can become so long that they eclipse the default Linux limit of 4096 > bytes. That results in commands that are cut off in {{ps}} output, and > prevents easily inspecting the system to see even what processes are running. > The specific scenario in which this problem can be easily triggered: *running > Storm on Mesos*. > h5. Details on why it happens: > # using the default Mesos containerizer instead of Docker containers, which > causes the storm-mesos package to be unpacked into the Mesos executor sandbox. > # The ["expand all jars on > classpath"|https://github.com/apache/storm/blob/6dc6407a01d032483edebb1c1b4d8b69a304d81c/bin/storm.py#L114-L140] > functionality in the {{bin/storm.py}} script causes every one of the jars > that storm bundles into its lib directory to be explicitly listed in the > command. > #* e.g., say the mesos work dir is {{/var/run/mesos/work_dir/}} > #* and say that the original classpath argument in the supervisor cmd > includes the following for the {{lib/}} dir in the binary storm package: > #** > {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/*}} > #* That leads to a hugely expanded classpath argument for the LogWriter and > Worker daemons that get launched: > #** > {{/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/asm-4.0.jar:/var/run/mesos/work_dir/slaves/2357b762-6653-4052-ab9e-f1354d78991b-S12/frameworks/20160509-084241-1086985738-5050-32231-/executors/STORM_TOPOLOGY_ID/runs/e6a1407e-73fd-4be4-8d00-e882117b3391/storm-mesos-0.1.7-storm0.9.6-mesos0.28.2/lib/carbonite-1.4.0.jar:...}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (STORM-2228) KafkaSpout does not replay properly when a topic maps to multiple streams
Robert Joseph Evans created STORM-2228: -- Summary: KafkaSpout does not replay properly when a topic maps to multiple streams Key: STORM-2228 URL: https://issues.apache.org/jira/browse/STORM-2228 Project: Apache Storm Issue Type: Bug Components: storm-kafka-client Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3 Reporter: Robert Joseph Evans Assignee: Robert Joseph Evans Priority: Blocker In the example. KafkaSpoutTopologyMainNamedTopics.java The code creates a TuplesBuilder and a KafkaSpoutStreams {code} protected KafkaSpoutTuplesBuilder getTuplesBuilder() { return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( new TopicsTest0Test1TupleBuilder(TOPICS[0], TOPICS[1]), new TopicTest2TupleBuilder(TOPICS[2])) .build(); } protected KafkaSpoutStreams getKafkaSpoutStreams() { final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value"); final Fields outputFields1 = new Fields("topic", "partition", "offset"); return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents of topic test2 sent to test_stream .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents of topic test2 sent to test2_stream .build(); } {code} Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", "offset")}} to {{STREAMS\[2]}}. (Don't know what happened to {{STREAMS\[1]}}) There are two issues here. First with how the TupleBuilder and the SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting the full "topic" "partition" "offset" "key" "value", but this minor. The real issue is that the code uses the same KafkaSpoutMessageId for all the tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}. https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304 The code, however, is written to assume that it will only ever get one ack/fail for a given KafkaSpoutMessageId. This means that if one of the emitted tuple trees succeed and then the other fails, the failure will not result in anything being replayed! This violates how storm is intended to work. I discovered this as a part of STORM-2225, and I am fine with fixing it on STORM-2225 (I would just remove support for that functionality because there are other ways of doing this correctly). But that would not maintain backwards compatibility and I am not sure it would be appropriate for 1.x releases. I really would like to have feedback from others on this. I can put something into 1.x where it will throw an exception if acking is enabled and this situation is present, but I don't want to spend the time tying to do reference counting on the number of tuples actually emitted. If someone else wants to do that I would be happy to turn this JIRA over to them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (STORM-2227) Better User Control of GC Options
Derek Dagit created STORM-2227: -- Summary: Better User Control of GC Options Key: STORM-2227 URL: https://issues.apache.org/jira/browse/STORM-2227 Project: Apache Storm Issue Type: Improvement Components: storm-core Affects Versions: 0.10.3 Reporter: Derek Dagit Priority: Minor As a user, I would like to override certain JVM garbage connection options instead of overriding them all, so that I can make simpler, safer changes. Currently, if a user wants to add some gc option in a topology, the user must copy everything from {{worker.gc.childopts}} to {{topology.worker.gc.childopts}} and make needed edits/additions. This is error prone, since the provided cluster-wide options can change, and because they are overwritten by default. A user can easily override settings unwittingly by adding new options if they forget to also copy the cluster-wide settings. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (STORM-2174) Initial Scaffolding for a Beam Pipeline Runner
[ https://issues.apache.org/jira/browse/STORM-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] P. Taylor Goetz resolved STORM-2174. Resolution: Fixed Merged to beam-runner feature branch. > Initial Scaffolding for a Beam Pipeline Runner > -- > > Key: STORM-2174 > URL: https://issues.apache.org/jira/browse/STORM-2174 > Project: Apache Storm > Issue Type: Bug >Reporter: P. Taylor Goetz >Assignee: P. Taylor Goetz > Time Spent: 50m > Remaining Estimate: 0h > > Create a module under external with an implementation of the basic > scaffolding, etc. for a beam runner. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (STORM-1886) Extend KeyValueState interface with delete method
[ https://issues.apache.org/jira/browse/STORM-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arun Mahadevan updated STORM-1886: -- Assignee: Balazs Kossovics > Extend KeyValueState interface with delete method > - > > Key: STORM-1886 > URL: https://issues.apache.org/jira/browse/STORM-1886 > Project: Apache Storm > Issue Type: Improvement >Reporter: Balazs Kossovics >Assignee: Balazs Kossovics > Time Spent: 1h 10m > Remaining Estimate: 0h > > Even if the implementation of checkpointing only uses the get/put methods of > the KeyValueState interface, the existance of a delete method could be really > useful in the general case. > I made a first implementation, what do you think about? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (STORM-2077) KafkaSpout doesn't retry failed tuples
[ https://issues.apache.org/jira/browse/STORM-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708652#comment-15708652 ] Matthias Klein edited comment on STORM-2077 at 11/30/16 1:54 PM: - The problem seems to be the following piece of code: {code} public class KafkaSpout extends BaseRichSpout { //... private void doSeekRetriableTopicPartitions() { //... if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to last committed offset <-- Indeed seeks to end of partition } {code} However, this code seeks to the end of the partition, not to the last committed offset. The following change seems to repair the issue: {code} if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { OffsetAndMetadata committed = kafkaConsumer.committed(rtp); if(committed == null) { // Nothing yet committed kafkaConsumer.seekToBeginning(toArrayList(rtp)); } else { // seek to entry after last committed offset kafkaConsumer.seek(rtp, committed.offset() + 1); } {code} was (Author: db3f): The problem seems to be the following piece of code: {code} public class KafkaSpout extends BaseRichSpout { //... private void doSeekRetriableTopicPartitions() { //... if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to last committed offset <-- Ideed seeks to end of partition } {code} However, this code seeks to the end of the partition, not to the last committed offset. The following change seems to repair the issue: {code} if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { OffsetAndMetadata committed = kafkaConsumer.committed(rtp); if(committed == null) { // Nothing yet committed kafkaConsumer.seekToBeginning(toArrayList(rtp)); } else { // seek to entry after last committed offset kafkaConsumer.seek(rtp, committed.offset() + 1); } {code} > KafkaSpout doesn't retry failed tuples > -- > > Key: STORM-2077 > URL: https://issues.apache.org/jira/browse/STORM-2077 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka >Affects Versions: 1.0.2 >Reporter: Tobias Maier > > KafkaSpout does not retry all failed tuples. > We used following Configuration: > Map props = new HashMap<>(); > props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1"); > props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, > ByteArrayDeserializer.class.getName()); > props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, > ByteArrayDeserializer.class.getName()); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > broker.bootstrapServer()); > KafkaSpoutStreams kafkaSpoutStreams = new > KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new > String[]{"test-topic"}).build(); > KafkaSpoutTuplesBuilder kafkaSpoutTuplesBuilder = new > KafkaSpoutTuplesBuilder.Builder<>(new > KeyValueKafkaSpoutTupleBuilder("test-topic")).build(); > KafkaSpoutRetryService retryService = new > KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), > KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3, > KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1)); > KafkaSpoutConfig config = new > KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder, > retryService) > .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST) > .setMaxUncommittedOffsets(30) > .setOffsetCommitPeriodMs(10) > .setMaxRetries(3) > .build(); > kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config); > The downstream bolt fails every tuple and we expect, that those tuple will > all be replayed. But that's not the case for every tuple. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-2077) KafkaSpout doesn't retry failed tuples
[ https://issues.apache.org/jira/browse/STORM-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708652#comment-15708652 ] Matthias Klein commented on STORM-2077: --- The problem seems to be the following piece of code: {code} public class KafkaSpout extends BaseRichSpout { //... private void doSeekRetriableTopicPartitions() { //... if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { kafkaConsumer.seekToEnd(toArrayList(rtp));// Seek to last committed offset <-- Ideed seeks to end of partition } {code} However, this code seeks to the end of the partition, not to the last committed offset. The following change seems to repair the issue: {code} if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { OffsetAndMetadata committed = kafkaConsumer.committed(rtp); if(committed == null) { // Nothing yet committed kafkaConsumer.seekToBeginning(toArrayList(rtp)); } else { // seek to entry after last committed offset kafkaConsumer.seek(rtp, committed.offset() + 1); } {code} > KafkaSpout doesn't retry failed tuples > -- > > Key: STORM-2077 > URL: https://issues.apache.org/jira/browse/STORM-2077 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka >Affects Versions: 1.0.2 >Reporter: Tobias Maier > > KafkaSpout does not retry all failed tuples. > We used following Configuration: > Map props = new HashMap<>(); > props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1"); > props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, > ByteArrayDeserializer.class.getName()); > props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, > ByteArrayDeserializer.class.getName()); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > broker.bootstrapServer()); > KafkaSpoutStreams kafkaSpoutStreams = new > KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new > String[]{"test-topic"}).build(); > KafkaSpoutTuplesBuilder kafkaSpoutTuplesBuilder = new > KafkaSpoutTuplesBuilder.Builder<>(new > KeyValueKafkaSpoutTupleBuilder("test-topic")).build(); > KafkaSpoutRetryService retryService = new > KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), > KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3, > KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1)); > KafkaSpoutConfig config = new > KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder, > retryService) > .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST) > .setMaxUncommittedOffsets(30) > .setOffsetCommitPeriodMs(10) > .setMaxRetries(3) > .build(); > kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config); > The downstream bolt fails every tuple and we expect, that those tuple will > all be replayed. But that's not the case for every tuple. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-2087) Storm-kafka-client: Failed tuples are not always replayed
[ https://issues.apache.org/jira/browse/STORM-2087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708199#comment-15708199 ] Andreas Maier commented on STORM-2087: -- Is this a duplicate of STORM-2077 ? > Storm-kafka-client: Failed tuples are not always replayed > -- > > Key: STORM-2087 > URL: https://issues.apache.org/jira/browse/STORM-2087 > Project: Apache Storm > Issue Type: Bug >Reporter: Jeff Fenchel > Time Spent: 9h > Remaining Estimate: 0h > > I am working with kafka 10 and the storm-kafka-client from master. It appears > that tuples are not always being replayed when they are failed. > With a topology that randomly fails tuples a small percentage of the time I > found that the committed kafka offset would get stuck and eventually > processing would stop even though the committed offset was no where near the > end of the topic. > I have also replicated the issue in unit tests with this PR: > https://github.com/apache/storm/pull/1679 > It seems that increasing the number of times I call nextTuple for the in > order case will make it work, but it doesn't seem to help the case where > tuples are failed out of order from which they were emitted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)