[jira] [Comment Edited] (STORM-2994) KafkaSpout consumes messages but doesn't commit offsets

2018-04-03 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424583#comment-16424583
 ] 

Stig Rohde Døssing edited comment on STORM-2994 at 4/3/18 8:53 PM:
---

Thanks [~RAbreu], merged to master, 1.x, 1.1.x and 1.0.x branches. Keep up the 
good work.


was (Author: srdo):
Thanks [~RAbreu], merged to master, 1.x, 1.1.x and 1.0.x branches.

> KafkaSpout consumes messages but doesn't commit offsets
> ---
>
> Key: STORM-2994
> URL: https://issues.apache.org/jira/browse/STORM-2994
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.1.2, 1.0.6, 1.2.1
>Reporter: Rui Abreu
>Assignee: Rui Abreu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.1.3, 1.0.7, 1.2.2
>
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> A topology that consumes from two different Kafka clusters: 0.10.1.1 and 
> 0.10.2.1.
> Spouts consuming from 0.10.2.1 have a low lag (and regularly commit offsets) 
> The Spout that consumes from 0.10.1.1 exhibits either:
> 1- Unknown lag
> 2- Lag that increments as the Spout reads messages from Kafka
>  
> In DEBUG, Offset manager logs: "topic-partition has NO offsets ready to be 
> committed", despite continuing to consume messages.
> Several configuration tweaks were tried, including setting maxRetries to 1, 
> in case messages with a lower offset were being retried (logs didn't show it, 
> though)
> offsetCommitPeriodMs was also  lowered to no avail.
> The only configuration that works is to have 
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=true, but this is undesired   since 
> we lose processing guarantees.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (STORM-2994) KafkaSpout consumes messages but doesn't commit offsets

2018-03-15 Thread RAbreu (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400333#comment-16400333
 ] 

RAbreu edited comment on STORM-2994 at 3/15/18 12:47 PM:
-

I've been injecting in the Kafka topic two types of messages
 * messages that are incorrect and won't be emitted by the Spout
 * messages that are correct and will be emitted by the Spout

Using storm-kafka-clients 1.1.0, even the messages that are emitted pile up and 
aren't commited

 

 
{code:java}
2018-03-15T12:22:25.880Z o.a.s.k.s.KafkaSpout [DEBUG] - Polled [0] records from 
Kafka. [1379] uncommitted offsets across all topic partitions

{code}
The incorrect messages aren't added to the total of uncommitted messages.

Using storm-kafka-clients 1.1.2, the behaviour is different.

I uploaded the same topology, bumping storm-kafka-clients to 1.1.2, picking up 
the previous lag of > 14000 and reducing it to ~3000.
 It didn't go to 0 and stayed firmly in that level.
 Once I started injecting correct messages, eventually all messages are 
committed (not sure at this point if it is when he commit interval elapses)

Question:

Aren't the tuples for which 
{code:java}
List apply(ConsumerRecord record);
{code}
 returns null, supposed the be added to the list of uncommited offsets?

 

 


was (Author: rabreu):
I've been injecting in the Kafka topic two types of messages
 * messages that are incorrect and won't be emitted by the Spout
 * messages that are correct and will be emitted by the Spout

Using storm-kafka-clients 1.1.0, even the messages that are emitted pile up and 
aren't commited

 

 
{code:java}
2018-03-15T12:22:25.880Z o.a.s.k.s.KafkaSpout [DEBUG] - Polled [0] records from 
Kafka. [1379] uncommitted offsets across all topic partitions

{code}
The incorrect messages aren't added to the total of uncommitted messages.

Using storm-kafka-clients 1.1.2, the behaviour is different.

I uploaded the same topology, bumping storm-kafka-clients to 1.2.1, picking up 
the previous lag of > 14000 and reducing it to ~3000.
 It didn't go to 0 and stayed firmly in that level.
 Once I started injecting correct messages, eventually all messages are 
committed (not sure at this point if it is when he commit interval elapses)

Question:

Aren't the tuples for which 
{code:java}
List apply(ConsumerRecord record);
{code}
 returns null, supposed the be added to the list of uncommited offsets?

 

 

> KafkaSpout consumes messages but doesn't commit offsets
> ---
>
> Key: STORM-2994
> URL: https://issues.apache.org/jira/browse/STORM-2994
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.1.0, 1.1.2
>Reporter: RAbreu
>Priority: Major
>
> A topology that consumes from two different Kafka clusters: 0.10.1.1 and 
> 0.10.2.1.
> Spouts consuming from 0.10.2.1 have a low lag (and regularly commit offsets) 
> The Spout that consumes from 0.10.1.1 exhibits either:
> 1- Unknown lag
> 2- Lag that increments as the Spout reads messages from Kafka
>  
> In DEBUG, Offset manager logs: "topic-partition has NO offsets ready to be 
> committed", despite continuing to consume messages.
> Several configuration tweaks were tried, including setting maxRetries to 1, 
> in case messages with a lower offset were being retried (logs didn't show it, 
> though)
> offsetCommitPeriodMs was also  lowered to no avail.
> The only configuration that works is to have 
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=true, but this is undesired   since 
> we lose processing guarantees.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (STORM-2994) KafkaSpout consumes messages but doesn't commit offsets

2018-03-14 Thread RAbreu (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399611#comment-16399611
 ] 

RAbreu edited comment on STORM-2994 at 3/14/18 11:55 PM:
-

Sorry, I had switched back to stom-kafka-client to 1.1.0 for a test and forgot 
to undo.

storm-kafka-clients 1.1.2 logs:

 

(Note: the message was expected not be emitted by the Spout, hence the "Not 
emitting null tuple")
{code:java}
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Skipping fetch for 
partition sample.topic-0 because the
re is an in-flight request to worker:9092 (id: 1 rack: null)
2018-03-14T23:06:50.777Z o.a.k.c.NetworkClient [TRACE] - Completed receive from 
node 1, for key 1, received {throttle_time_ms=0,resp
onses=[{topic=sample.topic,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=38785}
,record_set=[(offset=38781,record=Record(magic = 0, attributes = 0, compression 
= NONE, crc = 1115540089, key = 3 bytes, value = 285 by
tes)), (offset=38782,record=Record(magic = 0, attributes = 0, compression = 
NONE, crc = 1115540089, key = 3 bytes, value = 285 bytes)),
(offset=38783,record=Record(magic = 0, attributes = 0, compression = NONE, crc 
= 1115540089, key = 3 bytes, value = 285 bytes)), (offs
et=38784,record=Record(magic = 0, attributes = 0, compression = NONE, crc = 
1115540089, key = 3 bytes, value = 285 bytes))]}]}]}
2018-03-14T23:06:50.777Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - 
Topic partitions with entries ready to be retried [{}
] 
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Adding fetched record 
for partition sample.topic-0 with
offset 38781 to buffered record list
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Received 4 records in 
fetch response for partition sample.topic-0 with offset 38781
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Returning fetched 
records at offset 38781 for assigned partition sample.topic-0 and update 
position to 38785
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [DEBUG] - Ignoring fetched records 
for sample.topic-0 at offset 3
8781 since the current position is 38785
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Added fetch request for 
partition sample.topic-0 at off
set 38785 to node worker:9092 (id: 1 rack: null)
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [DEBUG] - Sending fetch for 
partitions [sample.topic-0] to broker
worker:9092 (id: 1 rack: null)
2018-03-14T23:06:50.777Z o.a.k.c.NetworkClient [TRACE] - Sending 
{replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,top
ics=[{topic=sample.topic,partitions=[{partition=0,fetch_offset=38785,max_bytes=1048576}]}]}
 to node 1.
2018-03-14T23:06:50.777Z o.a.s.k.s.KafkaSpout [DEBUG] - Polled [4] records from 
Kafka
2018-03-14T23:06:50.778Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - 
Topic partitions with entries ready to be retried [{}
] 

2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpout [DEBUG] - Not emitting null tuple 
for record [ConsumerRecord(topic = sample.topic, partition = 0, offset = 38781, 
NoTimestampType = -1, checksum = 1115540089, serialized key size = 3, 
serialized value size = 285, key = 123, value = {"id":"100"})] as defined in 
configuration.
2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpout [DEBUG] - Received direct ack for 
message [{topic-partition=sample.topic-0, offset=38781, numFails=0, 
emitted=false}], associated with null tuple
2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - 
Topic partitions with entries ready to be retried [{}] 
2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - 
Topic partitions with entries ready to be retried [{}] 
2018-03-14T23:06:50.780Z o.a.s.k.s.KafkaSpout [DEBUG] - Not emitting null tuple 
for record [ConsumerRecord(topic = sample.topic, partition = 0, offset = 38782, 
NoTimestampType = -1, checksum = 1115540089, serialized key size = 3, 
serialized value size = 285, key = 123, value = {"id":"100"})] as defined in 
configuration.
2018-03-14T23:06:50.780Z o.a.s.k.s.KafkaSpout [DEBUG] - Received direct ack for 
message [{topic-partition=sample.topic-0, offset=38782, numFails=0, 
emitted=false}], associated with null tuple
2018-03-14T23:06:50.958Z o.a.s.k.s.KafkaSpout [TRACE] - No offsets to commit. 
KafkaSpout{offsetManagers ={}, emitted=[]}
2018-03-14T23:06:50.958Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - 
Topic partitions with entries ready to be retried [{}]

{code}
 

 


was (Author: rabreu):
Sorry, I had switched back to stom-kafka-client to 1.1.0 for a test and forgot 
to undo.

storm-kafka-clients 1.1.2 logs:

 

(Note: the message was expected not be emitted by the Spout, hence the "Not 
emitting null tuple")
{code:java}
2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Skipping fetch for 
partition sample.topic-0 because the
re is an in-flight request to worker:9092 (id: 1 rack: 

[jira] [Comment Edited] (STORM-2994) KafkaSpout consumes messages but doesn't commit offsets

2018-03-13 Thread RAbreu (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397768#comment-16397768
 ] 

RAbreu edited comment on STORM-2994 at 3/13/18 10:26 PM:
-

Hello Stig. This was observed in both versions, but the vast majority of tests 
I conducted was on 1.1.2. Let me know if you would like to know more details.


was (Author: rabreu):
Hello Stuff. This was observed in both versions, but the vast majority of tests 
I conducted was on 1.1.2. Let me know if you would like to know more details.

> KafkaSpout consumes messages but doesn't commit offsets
> ---
>
> Key: STORM-2994
> URL: https://issues.apache.org/jira/browse/STORM-2994
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 1.1.0, 1.1.2
>Reporter: RAbreu
>Priority: Major
>
> A topology that consumes from two different Kafka clusters: 0.10.1.1 and 
> 0.10.2.1.
> Spouts consuming from 0.10.2.1 have a low lag (and regularly commit offsets) 
> The Spout that consumes from 0.10.1.1 exhibits either:
> 1- Unknown lag
> 2- Lag that increments as the Spout reads messages from Kafka
>  
> In DEBUG, Offset manager logs: "topic-partition has NO offsets ready to be 
> committed", despite continuing to consume messages.
> Several configuration tweaks were tried, including setting maxRetries to 1, 
> in case messages with a lower offset were being retried (logs didn't show it, 
> though)
> offsetCommitPeriodMs was also  lowered to no avail.
> The only configuration that works is to have 
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=true, but this is undesired   since 
> we lose processing guarantees.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)