[jira] [Comment Edited] (STORM-2994) KafkaSpout consumes messages but doesn't commit offsets
[ https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16424583#comment-16424583 ] Stig Rohde Døssing edited comment on STORM-2994 at 4/3/18 8:53 PM: --- Thanks [~RAbreu], merged to master, 1.x, 1.1.x and 1.0.x branches. Keep up the good work. was (Author: srdo): Thanks [~RAbreu], merged to master, 1.x, 1.1.x and 1.0.x branches. > KafkaSpout consumes messages but doesn't commit offsets > --- > > Key: STORM-2994 > URL: https://issues.apache.org/jira/browse/STORM-2994 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.1.2, 1.0.6, 1.2.1 >Reporter: Rui Abreu >Assignee: Rui Abreu >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0, 1.1.3, 1.0.7, 1.2.2 > > Time Spent: 3h 40m > Remaining Estimate: 0h > > A topology that consumes from two different Kafka clusters: 0.10.1.1 and > 0.10.2.1. > Spouts consuming from 0.10.2.1 have a low lag (and regularly commit offsets) > The Spout that consumes from 0.10.1.1 exhibits either: > 1- Unknown lag > 2- Lag that increments as the Spout reads messages from Kafka > > In DEBUG, Offset manager logs: "topic-partition has NO offsets ready to be > committed", despite continuing to consume messages. > Several configuration tweaks were tried, including setting maxRetries to 1, > in case messages with a lower offset were being retried (logs didn't show it, > though) > offsetCommitPeriodMs was also lowered to no avail. > The only configuration that works is to have > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=true, but this is undesired since > we lose processing guarantees. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (STORM-2994) KafkaSpout consumes messages but doesn't commit offsets
[ https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400333#comment-16400333 ] RAbreu edited comment on STORM-2994 at 3/15/18 12:47 PM: - I've been injecting in the Kafka topic two types of messages * messages that are incorrect and won't be emitted by the Spout * messages that are correct and will be emitted by the Spout Using storm-kafka-clients 1.1.0, even the messages that are emitted pile up and aren't commited {code:java} 2018-03-15T12:22:25.880Z o.a.s.k.s.KafkaSpout [DEBUG] - Polled [0] records from Kafka. [1379] uncommitted offsets across all topic partitions {code} The incorrect messages aren't added to the total of uncommitted messages. Using storm-kafka-clients 1.1.2, the behaviour is different. I uploaded the same topology, bumping storm-kafka-clients to 1.1.2, picking up the previous lag of > 14000 and reducing it to ~3000. It didn't go to 0 and stayed firmly in that level. Once I started injecting correct messages, eventually all messages are committed (not sure at this point if it is when he commit interval elapses) Question: Aren't the tuples for which {code:java} List apply(ConsumerRecordrecord); {code} returns null, supposed the be added to the list of uncommited offsets? was (Author: rabreu): I've been injecting in the Kafka topic two types of messages * messages that are incorrect and won't be emitted by the Spout * messages that are correct and will be emitted by the Spout Using storm-kafka-clients 1.1.0, even the messages that are emitted pile up and aren't commited {code:java} 2018-03-15T12:22:25.880Z o.a.s.k.s.KafkaSpout [DEBUG] - Polled [0] records from Kafka. [1379] uncommitted offsets across all topic partitions {code} The incorrect messages aren't added to the total of uncommitted messages. Using storm-kafka-clients 1.1.2, the behaviour is different. I uploaded the same topology, bumping storm-kafka-clients to 1.2.1, picking up the previous lag of > 14000 and reducing it to ~3000. It didn't go to 0 and stayed firmly in that level. Once I started injecting correct messages, eventually all messages are committed (not sure at this point if it is when he commit interval elapses) Question: Aren't the tuples for which {code:java} List apply(ConsumerRecord record); {code} returns null, supposed the be added to the list of uncommited offsets? > KafkaSpout consumes messages but doesn't commit offsets > --- > > Key: STORM-2994 > URL: https://issues.apache.org/jira/browse/STORM-2994 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 1.1.0, 1.1.2 >Reporter: RAbreu >Priority: Major > > A topology that consumes from two different Kafka clusters: 0.10.1.1 and > 0.10.2.1. > Spouts consuming from 0.10.2.1 have a low lag (and regularly commit offsets) > The Spout that consumes from 0.10.1.1 exhibits either: > 1- Unknown lag > 2- Lag that increments as the Spout reads messages from Kafka > > In DEBUG, Offset manager logs: "topic-partition has NO offsets ready to be > committed", despite continuing to consume messages. > Several configuration tweaks were tried, including setting maxRetries to 1, > in case messages with a lower offset were being retried (logs didn't show it, > though) > offsetCommitPeriodMs was also lowered to no avail. > The only configuration that works is to have > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=true, but this is undesired since > we lose processing guarantees. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (STORM-2994) KafkaSpout consumes messages but doesn't commit offsets
[ https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399611#comment-16399611 ] RAbreu edited comment on STORM-2994 at 3/14/18 11:55 PM: - Sorry, I had switched back to stom-kafka-client to 1.1.0 for a test and forgot to undo. storm-kafka-clients 1.1.2 logs: (Note: the message was expected not be emitted by the Spout, hence the "Not emitting null tuple") {code:java} 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Skipping fetch for partition sample.topic-0 because the re is an in-flight request to worker:9092 (id: 1 rack: null) 2018-03-14T23:06:50.777Z o.a.k.c.NetworkClient [TRACE] - Completed receive from node 1, for key 1, received {throttle_time_ms=0,resp onses=[{topic=sample.topic,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=38785} ,record_set=[(offset=38781,record=Record(magic = 0, attributes = 0, compression = NONE, crc = 1115540089, key = 3 bytes, value = 285 by tes)), (offset=38782,record=Record(magic = 0, attributes = 0, compression = NONE, crc = 1115540089, key = 3 bytes, value = 285 bytes)), (offset=38783,record=Record(magic = 0, attributes = 0, compression = NONE, crc = 1115540089, key = 3 bytes, value = 285 bytes)), (offs et=38784,record=Record(magic = 0, attributes = 0, compression = NONE, crc = 1115540089, key = 3 bytes, value = 285 bytes))]}]}]} 2018-03-14T23:06:50.777Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - Topic partitions with entries ready to be retried [{} ] 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Adding fetched record for partition sample.topic-0 with offset 38781 to buffered record list 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Received 4 records in fetch response for partition sample.topic-0 with offset 38781 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Returning fetched records at offset 38781 for assigned partition sample.topic-0 and update position to 38785 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [DEBUG] - Ignoring fetched records for sample.topic-0 at offset 3 8781 since the current position is 38785 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Added fetch request for partition sample.topic-0 at off set 38785 to node worker:9092 (id: 1 rack: null) 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [DEBUG] - Sending fetch for partitions [sample.topic-0] to broker worker:9092 (id: 1 rack: null) 2018-03-14T23:06:50.777Z o.a.k.c.NetworkClient [TRACE] - Sending {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,top ics=[{topic=sample.topic,partitions=[{partition=0,fetch_offset=38785,max_bytes=1048576}]}]} to node 1. 2018-03-14T23:06:50.777Z o.a.s.k.s.KafkaSpout [DEBUG] - Polled [4] records from Kafka 2018-03-14T23:06:50.778Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - Topic partitions with entries ready to be retried [{} ] 2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpout [DEBUG] - Not emitting null tuple for record [ConsumerRecord(topic = sample.topic, partition = 0, offset = 38781, NoTimestampType = -1, checksum = 1115540089, serialized key size = 3, serialized value size = 285, key = 123, value = {"id":"100"})] as defined in configuration. 2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpout [DEBUG] - Received direct ack for message [{topic-partition=sample.topic-0, offset=38781, numFails=0, emitted=false}], associated with null tuple 2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - Topic partitions with entries ready to be retried [{}] 2018-03-14T23:06:50.779Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - Topic partitions with entries ready to be retried [{}] 2018-03-14T23:06:50.780Z o.a.s.k.s.KafkaSpout [DEBUG] - Not emitting null tuple for record [ConsumerRecord(topic = sample.topic, partition = 0, offset = 38782, NoTimestampType = -1, checksum = 1115540089, serialized key size = 3, serialized value size = 285, key = 123, value = {"id":"100"})] as defined in configuration. 2018-03-14T23:06:50.780Z o.a.s.k.s.KafkaSpout [DEBUG] - Received direct ack for message [{topic-partition=sample.topic-0, offset=38782, numFails=0, emitted=false}], associated with null tuple 2018-03-14T23:06:50.958Z o.a.s.k.s.KafkaSpout [TRACE] - No offsets to commit. KafkaSpout{offsetManagers ={}, emitted=[]} 2018-03-14T23:06:50.958Z o.a.s.k.s.KafkaSpoutRetryExponentialBackoff [DEBUG] - Topic partitions with entries ready to be retried [{}] {code} was (Author: rabreu): Sorry, I had switched back to stom-kafka-client to 1.1.0 for a test and forgot to undo. storm-kafka-clients 1.1.2 logs: (Note: the message was expected not be emitted by the Spout, hence the "Not emitting null tuple") {code:java} 2018-03-14T23:06:50.777Z o.a.k.c.c.i.Fetcher [TRACE] - Skipping fetch for partition sample.topic-0 because the re is an in-flight request to worker:9092 (id: 1 rack:
[jira] [Comment Edited] (STORM-2994) KafkaSpout consumes messages but doesn't commit offsets
[ https://issues.apache.org/jira/browse/STORM-2994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397768#comment-16397768 ] RAbreu edited comment on STORM-2994 at 3/13/18 10:26 PM: - Hello Stig. This was observed in both versions, but the vast majority of tests I conducted was on 1.1.2. Let me know if you would like to know more details. was (Author: rabreu): Hello Stuff. This was observed in both versions, but the vast majority of tests I conducted was on 1.1.2. Let me know if you would like to know more details. > KafkaSpout consumes messages but doesn't commit offsets > --- > > Key: STORM-2994 > URL: https://issues.apache.org/jira/browse/STORM-2994 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 1.1.0, 1.1.2 >Reporter: RAbreu >Priority: Major > > A topology that consumes from two different Kafka clusters: 0.10.1.1 and > 0.10.2.1. > Spouts consuming from 0.10.2.1 have a low lag (and regularly commit offsets) > The Spout that consumes from 0.10.1.1 exhibits either: > 1- Unknown lag > 2- Lag that increments as the Spout reads messages from Kafka > > In DEBUG, Offset manager logs: "topic-partition has NO offsets ready to be > committed", despite continuing to consume messages. > Several configuration tweaks were tried, including setting maxRetries to 1, > in case messages with a lower offset were being retried (logs didn't show it, > though) > offsetCommitPeriodMs was also lowered to no avail. > The only configuration that works is to have > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG=true, but this is undesired since > we lose processing guarantees. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)