[jira] [Updated] (STORM-2352) New Kafka spout retries for ever even with retries of 5

2017-02-08 Thread Kishore Senji (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kishore Senji updated STORM-2352:
-
Description: 
v1.0.0 and above

KafkaSpout is created with a KafkaSpoutConfig having maxRetries of 5. Still the 
KafkaSpout retries the failed Tuple forever. 

Reason:
The numFails are incremented in fail() method of KafkaSpout.
{code}
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
emitted.remove(msgId);
if (msgId.numFails() < maxRetries) {
msgId.incrementNumFails();
retryService.schedule(msgId);
} else { // limit to max number of retries
LOG.debug("Reached maximum number of retries. Message [{}] being 
marked as acked.", msgId);
ack(msgId);
}
}
{code}

However the emitTupleIfNotEmitted() creates a new KafkaSpoutMessageId  and 
checks if the msgId is ready to be emitted (in the case of failure) and if so 
emits the new msgId instance (thus losing the numFails from the previous time)

{code}
private void emitTupleIfNotEmitted(ConsumerRecord record) {
final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);

if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // has 
been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", 
record);
} else if (emitted.contains(msgId)) {   // has been emitted and it's 
pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
} else if (!retryService.isScheduled(msgId) || 
retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never 
emitted) or ready to be retried
final List tuple = tuplesBuilder.buildTuple(record);
kafkaSpoutStreams.emit(collector, tuple, msgId);
emitted.add(msgId);
numUncommittedOffsets++;
if (retryService.isReady(msgId)) { // has failed. Is it ready for 
retry ?
retryService.remove(msgId);  // re-emitted hence remove from 
failed
}
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
}
{code}

isReady() is not a side-effect. It just looks up and returns true. Fix is to 
either modify the RetryService interface to convey back the msgId in the 
RetryService or make the isReady() a side-effect to attach the numFails from 
the previous time OR to add 'failed' to KafkaSpout to keep track of failed msgs 
(similar to acked) and use the msgId from the failed to emit if isReady() is 
true

  was:
v1.0.0 and above

KafkaSpout is created with a KafkaSpoutConfig having maxRetries of 5. Still the 
KafkaSpout retries the failed Tuple forever. 

Reason:
The numFails are incremented in fail() method of KafkaSpout.
{code}
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
emitted.remove(msgId);
if (msgId.numFails() < maxRetries) {
msgId.incrementNumFails();
retryService.schedule(msgId);
} else { // limit to max number of retries
LOG.debug("Reached maximum number of retries. Message [{}] being 
marked as acked.", msgId);
ack(msgId);
}
}
{code}

However the emitTupleIfNotEmitted() creates a new KafkaSpoutMessageId  and 
checks if the msgId is ready to be emitted (in the case of failure) and if so 
emits the new msgId instance (thus losing the numFails from the previous time)

{code}
private void emitTupleIfNotEmitted(ConsumerRecord record) {
final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);

if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // has 
been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", 
record);
} else if (emitted.contains(msgId)) {   // has been emitted and it's 
pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
} else if (!retryService.isScheduled(msgId) || 
retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never 
emitted) or ready to be retried
final List tuple = tuplesBuilder.buildTuple(record);
kafkaSpoutStreams.emit(collector, tuple, msgId);
emitted.add(msgId);
numUncommittedOffsets++;
if (retryService.isReady(msgId)) { // has failed. Is it ready for 
retry ?
retryService.remove(msgId);  // re-emitted hence remove from 
failed
}
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
}

[jira] [Created] (STORM-2352) New Kafka spout retries for ever even with retries of 5

2017-02-08 Thread Kishore Senji (JIRA)
Kishore Senji created STORM-2352:


 Summary: New Kafka spout retries for ever even with retries of 5
 Key: STORM-2352
 URL: https://issues.apache.org/jira/browse/STORM-2352
 Project: Apache Storm
  Issue Type: Bug
  Components: storm-kafka-client
Affects Versions: 1.0.0, 1.1.0
Reporter: Kishore Senji


v1.0.0 and above

KafkaSpout is created with a KafkaSpoutConfig having maxRetries of 5. Still the 
KafkaSpout retries the failed Tuple forever. 

Reason:
The numFails are incremented in fail() method of KafkaSpout.
{code}
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
emitted.remove(msgId);
if (msgId.numFails() < maxRetries) {
msgId.incrementNumFails();
retryService.schedule(msgId);
} else { // limit to max number of retries
LOG.debug("Reached maximum number of retries. Message [{}] being 
marked as acked.", msgId);
ack(msgId);
}
}
{code}

However the emitTupleIfNotEmitted() creates a new KafkaSpoutMessageId  and 
checks if the msgId is ready to be emitted (in the case of failure) and if so 
emits the new msgId instance (thus losing the numFails from the previous time)

{code}
private void emitTupleIfNotEmitted(ConsumerRecord record) {
final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);

if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // has 
been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", 
record);
} else if (emitted.contains(msgId)) {   // has been emitted and it's 
pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
} else if (!retryService.isScheduled(msgId) || 
retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never 
emitted) or ready to be retried
final List tuple = tuplesBuilder.buildTuple(record);
kafkaSpoutStreams.emit(collector, tuple, msgId);
emitted.add(msgId);
numUncommittedOffsets++;
if (retryService.isReady(msgId)) { // has failed. Is it ready for 
retry ?
retryService.remove(msgId);  // re-emitted hence remove from 
failed
}
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
}
{code}

isReady() is not a side-effect. It just looks up and returns true. Fix is to 
either modify the RetryService interface to convey back the msgId in the 
RetryService or make the isReady() a side-effect to attach the numFails from 
the previous time



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (STORM-1464) storm-hdfs should support writing to multiple files

2017-02-08 Thread P. Taylor Goetz (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

P. Taylor Goetz updated STORM-1464:
---
Fix Version/s: 1.1.0

> storm-hdfs should support writing to multiple files
> ---
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-hdfs
>Reporter: Aaron Dossett
>Assignee: Aaron Dossett
>  Labels: avro
> Fix For: 2.0.0, 1.1.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different 
> file. Schema evolution is a common use of avro and the avro bolt should 
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.  
> For example, if the tuple contains a "USER" field, it should be possible to 
> partition based on that value.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (STORM-2270) Kafka spout should consume from latest when zk committed offset bigger than latest offset

2017-02-08 Thread P. Taylor Goetz (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-2270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

P. Taylor Goetz resolved STORM-2270.

   Resolution: Fixed
Fix Version/s: 1.1.0

Merged to 1.x-branch.

> Kafka spout should consume from latest when zk committed offset bigger than 
> latest offset
> -
>
> Key: STORM-2270
> URL: https://issues.apache.org/jira/browse/STORM-2270
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 1.0.0, 0.9.6
>Reporter: Yuzhao Chen
>  Labels: easyfix
> Fix For: 1.1.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Kafka spout should consume from latest when ZK offset bigger than latest 
> offset[ an TopicOffsetOutOfRangeException thrown out ], especially when Kafka 
> topic change it's leader and some data lost, if we consume from earliest 
> offset, much meaningless duplicate records will be re-consumed. So, we should 
> consume from latest offset instead.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)