[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527100#comment-15527100 ] Michael Coon commented on KAFKA-4224: - I cannot copy/paste the stack track...sorry, not "can" > IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient > - > > Key: KAFKA-4224 > URL: https://issues.apache.org/jira/browse/KAFKA-4224 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.1 >Reporter: Michael Coon > > For whatever reason, I seem to have a corrupted message that is returned from > a broker that puts the consumer into an infinite loop. The > org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the > next record from the RecordsIterator or MemoryRecords but when it attempts to > decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, > that exception is merely logged and the Fetcher goes on to get the next > message. But the exception apparently does not move the underlying buffer > read forward in such a way that it would actually go and get the next record. > The result: it keeps trying to read the corrupted record but can't make > progress. > I offer two potential solutions: > 1) throw the exception up to me and let me figure out whether I want to skip > forward in offsets > 2) Make sure the underlying RecordsIterator actually moves forward on > exceptions so that progress can be made when corrupted messages are found. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527098#comment-15527098 ] Michael Coon commented on KAFKA-4224: - Gotchya. Unfortunately, the system throwing the exception is an isolated system and I can copy/paste the stack trace. I would need to test 0.10.0.1 to see if the new record parsing code would throw a different exception. I still don't believe that code would throw the detail I would need (i.e. offset/partition). > IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient > - > > Key: KAFKA-4224 > URL: https://issues.apache.org/jira/browse/KAFKA-4224 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.1 >Reporter: Michael Coon > > For whatever reason, I seem to have a corrupted message that is returned from > a broker that puts the consumer into an infinite loop. The > org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the > next record from the RecordsIterator or MemoryRecords but when it attempts to > decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, > that exception is merely logged and the Fetcher goes on to get the next > message. But the exception apparently does not move the underlying buffer > read forward in such a way that it would actually go and get the next record. > The result: it keeps trying to read the corrupted record but can't make > progress. > I offer two potential solutions: > 1) throw the exception up to me and let me figure out whether I want to skip > forward in offsets > 2) Make sure the underlying RecordsIterator actually moves forward on > exceptions so that progress can be made when corrupted messages are found. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527018#comment-15527018 ] Michael Coon commented on KAFKA-4224: - As I said, line 278 of NetworkClient (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java) is where the stack trace is being logged. It's catching java.lang.Exception, so anything other than a Throwble would be caught and just logged. I have no visibility into a problem in my consumer.poll() call. I just get back empty records (or in the case of a consumer that has multiple partitions assigned, I would have one partition never making progress). If instead of catching java.lang.Exception, if the Fetcher callback being called on line 278 could throw a more detailed exception, and NetworkClient could pass that up to my app code somehow, that's all I would need to know something is wrong with a particular partition at a particular offset. I understand why the code punts and just logs the problem--because at this point it doesn't know which partitions are failing for the body of the ClientResponse. That doesn't get flushed out until further down in the callback code. That's why something further down would have to pass back some kind of partial status or something...indicating which partitions failed, for what reason, and what offset had issues. That would be ideal. But again, I get why they just log it at this point...there is no mechanism to have partial results. As it stands, if my consumer is managing multiple partitions, it has no way of knowing there is a problem with a particular partition unless it looks over the progress of all partitions after each poll call. That's the part that's highly inefficient. > IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient > - > > Key: KAFKA-4224 > URL: https://issues.apache.org/jira/browse/KAFKA-4224 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.1 >Reporter: Michael Coon > > For whatever reason, I seem to have a corrupted message that is returned from > a broker that puts the consumer into an infinite loop. The > org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the > next record from the RecordsIterator or MemoryRecords but when it attempts to > decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, > that exception is merely logged and the Fetcher goes on to get the next > message. But the exception apparently does not move the underlying buffer > read forward in such a way that it would actually go and get the next record. > The result: it keeps trying to read the corrupted record but can't make > progress. > I offer two potential solutions: > 1) throw the exception up to me and let me figure out whether I want to skip > forward in offsets > 2) Make sure the underlying RecordsIterator actually moves forward on > exceptions so that progress can be made when corrupted messages are found. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526908#comment-15526908 ] Michael Coon commented on KAFKA-4224: - Line 278 NetworkClient attempts to parse the record by calling the Fetcher response callback. It wraps that callback in try/catch and catches "Exception" and just logs it. It does NOT throw it back to me. What would be awesome is if the fact that a corrupted record were found, that some "CorruptedRecordException" would be thrown back to me and I could interrogate that exception for the corrupted offset/topic/partition and either skip it, log it, do something magical, whatever. That gives me the ability to handle the actual situation instead of guessing what I should be doing with the problem. The real challenge is that with a consumer assigned multiple partitions, that consumer needs to check some offset progress for all partitions and then attempt to advance those that have not made progress--which may or may not be due to corrupted records. It's not clean/efficient/clear at all. > IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient > - > > Key: KAFKA-4224 > URL: https://issues.apache.org/jira/browse/KAFKA-4224 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.1 >Reporter: Michael Coon > > For whatever reason, I seem to have a corrupted message that is returned from > a broker that puts the consumer into an infinite loop. The > org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the > next record from the RecordsIterator or MemoryRecords but when it attempts to > decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, > that exception is merely logged and the Fetcher goes on to get the next > message. But the exception apparently does not move the underlying buffer > read forward in such a way that it would actually go and get the next record. > The result: it keeps trying to read the corrupted record but can't make > progress. > I offer two potential solutions: > 1) throw the exception up to me and let me figure out whether I want to skip > forward in offsets > 2) Make sure the underlying RecordsIterator actually moves forward on > exceptions so that progress can be made when corrupted messages are found. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526189#comment-15526189 ] Michael Coon commented on KAFKA-4224: - I think this catches the problem with IndexOutOfBounds, but would still yield a caught exception at the NetworkClient level. Basically, the challenge is that the exception is being caught in the NetworkClient and ultimately get back a ConsumerRecords with 0 count. I have no way of knowing that there was a problem, so I keep trying. Oddly, even though subsequent fetch requests should theoretically move the position in the partition forward, I keep getting the error over and over. I can keep moving the offset forward until success but that seems like it's going to introduce other problems or potentially skip records if things take too many attempts to come back. > IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient > - > > Key: KAFKA-4224 > URL: https://issues.apache.org/jira/browse/KAFKA-4224 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.1 >Reporter: Michael Coon > > For whatever reason, I seem to have a corrupted message that is returned from > a broker that puts the consumer into an infinite loop. The > org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the > next record from the RecordsIterator or MemoryRecords but when it attempts to > decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, > that exception is merely logged and the Fetcher goes on to get the next > message. But the exception apparently does not move the underlying buffer > read forward in such a way that it would actually go and get the next record. > The result: it keeps trying to read the corrupted record but can't make > progress. > I offer two potential solutions: > 1) throw the exception up to me and let me figure out whether I want to skip > forward in offsets > 2) Make sure the underlying RecordsIterator actually moves forward on > exceptions so that progress can be made when corrupted messages are found. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526080#comment-15526080 ] Michael Coon commented on KAFKA-4224: - In doing more digging, it is actually a larger problem of not making progress on the consumer reading the current (corrupted) offset. The poll call eventually returns empty records but when I poll again, it merely attempts to pull back the same corrupted messages. I basically have to create a watchdog thread that detects lack of forward progress and skips offsets accordingly. > IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient > - > > Key: KAFKA-4224 > URL: https://issues.apache.org/jira/browse/KAFKA-4224 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.1 >Reporter: Michael Coon > > For whatever reason, I seem to have a corrupted message that is returned from > a broker that puts the consumer into an infinite loop. The > org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the > next record from the RecordsIterator or MemoryRecords but when it attempts to > decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, > that exception is merely logged and the Fetcher goes on to get the next > message. But the exception apparently does not move the underlying buffer > read forward in such a way that it would actually go and get the next record. > The result: it keeps trying to read the corrupted record but can't make > progress. > I offer two potential solutions: > 1) throw the exception up to me and let me figure out whether I want to skip > forward in offsets > 2) Make sure the underlying RecordsIterator actually moves forward on > exceptions so that progress can be made when corrupted messages are found. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
[ https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525951#comment-15525951 ] Michael Coon commented on KAFKA-4224: - I would love to give you a stack trace, but I unfortunately cannot provide a full stack trace due the system with the error being in an isolated environment. > IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient > - > > Key: KAFKA-4224 > URL: https://issues.apache.org/jira/browse/KAFKA-4224 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.1 >Reporter: Michael Coon > > For whatever reason, I seem to have a corrupted message that is returned from > a broker that puts the consumer into an infinite loop. The > org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the > next record from the RecordsIterator or MemoryRecords but when it attempts to > decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, > that exception is merely logged and the Fetcher goes on to get the next > message. But the exception apparently does not move the underlying buffer > read forward in such a way that it would actually go and get the next record. > The result: it keeps trying to read the corrupted record but can't make > progress. > I offer two potential solutions: > 1) throw the exception up to me and let me figure out whether I want to skip > forward in offsets > 2) Make sure the underlying RecordsIterator actually moves forward on > exceptions so that progress can be made when corrupted messages are found. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
Michael Coon created KAFKA-4224: --- Summary: IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient Key: KAFKA-4224 URL: https://issues.apache.org/jira/browse/KAFKA-4224 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.10.0.1 Reporter: Michael Coon For whatever reason, I seem to have a corrupted message that is returned from a broker that puts the consumer into an infinite loop. The org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the next record from the RecordsIterator or MemoryRecords but when it attempts to decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, that exception is merely logged and the Fetcher goes on to get the next message. But the exception apparently does not move the underlying buffer read forward in such a way that it would actually go and get the next record. The result: it keeps trying to read the corrupted record but can't make progress. I offer two potential solutions: 1) throw the exception up to me and let me figure out whether I want to skip forward in offsets 2) Make sure the underlying RecordsIterator actually moves forward on exceptions so that progress can be made when corrupted messages are found. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3564) Count metric always increments by 1.0
[ https://issues.apache.org/jira/browse/KAFKA-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15248385#comment-15248385 ] Michael Coon commented on KAFKA-3564: - If you look at the impl for Count, it only increments by 1.0 each time I call record (which calls update of Count from SampleStat). So it will only ever increment my count by 1 no matter what I send in as the actual value of record. For Total, you'll notice it overrides record and measure methods and does NOT reset it's total, ever. So if I want messages-in per second, I can't do new Rate(new Count()) because it will always only increment its total by 1.0. I have sub-elements of data coming into my processors that I want to use for the increment of count. I would have to iterate through all of them to increment count for each one...wasting cycles. Instead, I want to do something like "msgsIn.record(incoming.size())" and be done with it. If I use new Rate(new Total()), then the total value never changes so the "rate" would just keep climbing and climbing forever and not give me the rate of messages-in in the last minute, for example. > Count metric always increments by 1.0 > - > > Key: KAFKA-3564 > URL: https://issues.apache.org/jira/browse/KAFKA-3564 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Michael Coon >Assignee: Kim Christensen > > The Count metric's update method always increments its value by 1.0 instead > of the value passed to it. If this is by design, it's misleading as I want to > be able to count based on values I send to the record method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3564) Count metric always increments by 1.0
[ https://issues.apache.org/jira/browse/KAFKA-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247662#comment-15247662 ] Michael Coon commented on KAFKA-3564: - Hmm, Sum wasn't available when I did a clone on the codebase. I'll have to update and see if that fits the bill. So what does count represent in metric terms then? If it were a kind of "instance" snapshot type of count--like "how many of X are in memory right now", cool; but there is no way to decrement the count so not sure how valuable it would be. Anyway, I'll check the sum metric out. Thanks! > Count metric always increments by 1.0 > - > > Key: KAFKA-3564 > URL: https://issues.apache.org/jira/browse/KAFKA-3564 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Michael Coon >Assignee: Kim Christensen > > The Count metric's update method always increments its value by 1.0 instead > of the value passed to it. If this is by design, it's misleading as I want to > be able to count based on values I send to the record method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3564) Count metric always increments by 1.0
Michael Coon created KAFKA-3564: --- Summary: Count metric always increments by 1.0 Key: KAFKA-3564 URL: https://issues.apache.org/jira/browse/KAFKA-3564 Project: Kafka Issue Type: Bug Affects Versions: 0.10.1.0 Reporter: Michael Coon The Count metric's update method always increments its value by 1.0 instead of the value passed to it. If this is by design, it's misleading as I want to be able to count based on values I send to the record method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3538) Abstract the creation/retrieval of Producer for stream sinks for unit testing
Michael Coon created KAFKA-3538: --- Summary: Abstract the creation/retrieval of Producer for stream sinks for unit testing Key: KAFKA-3538 URL: https://issues.apache.org/jira/browse/KAFKA-3538 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 0.10.1.0 Reporter: Michael Coon Assignee: Guozhang Wang Priority: Minor The StreamThread creates producer/consumers directly as KafkaProducer and KafkaConsumer, thus eliminating my ability to unit test my streams code without having an active Kafka nearby. Could this be abstracted in a way that it relies on an optional ProducerProvider or ConsumerProvider implementation that could inject a mock producer/consumer for unit testing? We do this in all our kafka code for unit testing and if a provider is not offered at runtime, we create the concrete KafkaProdocer/Consumer components by default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3537) Provide access to low-level Metrics in ProcessorContext
Michael Coon created KAFKA-3537: --- Summary: Provide access to low-level Metrics in ProcessorContext Key: KAFKA-3537 URL: https://issues.apache.org/jira/browse/KAFKA-3537 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 0.9.0.1 Reporter: Michael Coon Assignee: Guozhang Wang Priority: Minor It would be good to have access to the underlying Metrics component in StreamMetrics. StreamMetrics forces a naming convention for metrics that does not fit our use case for reporting. We need to be able to convert the stream metrics to our own metrics formatting and it's cumbersome to extract group/op names from pre-formatted strings the way they are setup in StreamMetricsImpl. If there were a "metrics()" method of StreamMetrics to give me the underlying Metrics object, I could register my own sensors/metrics as needed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3535) Add metrics ability for streams serde components
Michael Coon created KAFKA-3535: --- Summary: Add metrics ability for streams serde components Key: KAFKA-3535 URL: https://issues.apache.org/jira/browse/KAFKA-3535 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 0.9.0.1 Reporter: Michael Coon Assignee: Guozhang Wang Priority: Minor Add the ability to record metrics in the serializer/deserializer components. As it stands, I cannot record latency/sensor metrics since the API does not provide the context at the serde levels. Exposing the ProcessorContext at this level may not be the solution; but perhaps change the configure method to take a different config or init context and make the StreamMetrics available in that context along with config information. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3534) Deserialize on demand when default time extractor used
Michael Coon created KAFKA-3534: --- Summary: Deserialize on demand when default time extractor used Key: KAFKA-3534 URL: https://issues.apache.org/jira/browse/KAFKA-3534 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 0.9.0.1 Reporter: Michael Coon Assignee: Guozhang Wang Priority: Minor When records are added to the RecordQueue, they are deserialized at that time in order to extract the timestamp. But for some data flows where large messages are consumed (particularly compressed messages), this can result in large spikes in memory as all messages must be deserialized prior to processing (and getting out of memory). An optimization might be to only require deserialization at this stage if a non-default timestamp extractor is being used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)