[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient

2016-09-27 Thread Michael Coon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527100#comment-15527100
 ] 

Michael Coon commented on KAFKA-4224:
-

I cannot copy/paste the stack track...sorry, not "can"

> IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
> -
>
> Key: KAFKA-4224
> URL: https://issues.apache.org/jira/browse/KAFKA-4224
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.1
>Reporter: Michael Coon
>
> For whatever reason, I seem to have a corrupted message that is returned from 
> a broker that puts the consumer into an infinite loop. The 
> org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the 
> next record from the RecordsIterator or MemoryRecords but when it attempts to 
> decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, 
> that exception is merely logged and the Fetcher goes on to get the next 
> message. But the exception apparently does not move the underlying buffer 
> read forward in such a way that it would actually go and get the next record. 
> The result: it keeps trying to read the corrupted record but can't make 
> progress. 
> I offer two potential solutions: 
> 1) throw the exception up to me and let me figure out whether I want to skip 
> forward in offsets
> 2) Make sure the underlying RecordsIterator actually moves forward on 
> exceptions so that progress can be made when corrupted messages are found.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient

2016-09-27 Thread Michael Coon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527098#comment-15527098
 ] 

Michael Coon commented on KAFKA-4224:
-

Gotchya. Unfortunately, the system throwing the exception is an isolated system 
and I can copy/paste the stack trace.

I would need to test 0.10.0.1 to see if the new record parsing code would throw 
a different exception. I still don't believe that code would throw the detail I 
would need (i.e. offset/partition). 

> IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
> -
>
> Key: KAFKA-4224
> URL: https://issues.apache.org/jira/browse/KAFKA-4224
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.1
>Reporter: Michael Coon
>
> For whatever reason, I seem to have a corrupted message that is returned from 
> a broker that puts the consumer into an infinite loop. The 
> org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the 
> next record from the RecordsIterator or MemoryRecords but when it attempts to 
> decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, 
> that exception is merely logged and the Fetcher goes on to get the next 
> message. But the exception apparently does not move the underlying buffer 
> read forward in such a way that it would actually go and get the next record. 
> The result: it keeps trying to read the corrupted record but can't make 
> progress. 
> I offer two potential solutions: 
> 1) throw the exception up to me and let me figure out whether I want to skip 
> forward in offsets
> 2) Make sure the underlying RecordsIterator actually moves forward on 
> exceptions so that progress can be made when corrupted messages are found.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient

2016-09-27 Thread Michael Coon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15527018#comment-15527018
 ] 

Michael Coon commented on KAFKA-4224:
-

As I said, line 278 of NetworkClient 
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java)
  is where the stack trace is being logged. It's catching java.lang.Exception, 
so anything other than a Throwble would be caught and just logged. I have no 
visibility into a problem in my consumer.poll() call. I just get back empty 
records (or in the case of a consumer that has multiple partitions assigned, I 
would have one partition never making progress). If instead of catching 
java.lang.Exception, if the Fetcher callback being called on line 278 could 
throw a more detailed exception, and NetworkClient could pass that up to my app 
code somehow, that's all I would need to know something is wrong with a 
particular partition at a particular offset. I understand why the code punts 
and just logs the problem--because at this point it doesn't know which 
partitions are failing for the body of the ClientResponse. That doesn't get 
flushed out until further down in the callback code. That's why something 
further down would have to pass back some kind of partial status or 
something...indicating which partitions failed, for what reason, and what 
offset had issues. That would be ideal. But again, I get why they just log it 
at this point...there is no mechanism to have partial results.

As it stands, if my consumer is managing multiple partitions, it has no way of 
knowing there is a problem with a particular partition unless it looks over the 
progress of all partitions after each poll call. That's the part that's highly 
inefficient.
 

> IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
> -
>
> Key: KAFKA-4224
> URL: https://issues.apache.org/jira/browse/KAFKA-4224
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.1
>Reporter: Michael Coon
>
> For whatever reason, I seem to have a corrupted message that is returned from 
> a broker that puts the consumer into an infinite loop. The 
> org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the 
> next record from the RecordsIterator or MemoryRecords but when it attempts to 
> decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, 
> that exception is merely logged and the Fetcher goes on to get the next 
> message. But the exception apparently does not move the underlying buffer 
> read forward in such a way that it would actually go and get the next record. 
> The result: it keeps trying to read the corrupted record but can't make 
> progress. 
> I offer two potential solutions: 
> 1) throw the exception up to me and let me figure out whether I want to skip 
> forward in offsets
> 2) Make sure the underlying RecordsIterator actually moves forward on 
> exceptions so that progress can be made when corrupted messages are found.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient

2016-09-27 Thread Michael Coon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526908#comment-15526908
 ] 

Michael Coon commented on KAFKA-4224:
-

Line 278 NetworkClient attempts to parse the record by calling the Fetcher 
response callback. It wraps that callback in try/catch and catches "Exception" 
and just logs it. It does NOT throw it back to me. What would be awesome is if 
the fact that a corrupted record were found, that some 
"CorruptedRecordException" would be thrown back to me and I could interrogate 
that exception for the corrupted offset/topic/partition and either skip it, log 
it, do something magical, whatever. That gives me the ability to handle the 
actual situation instead of guessing what I should be doing with the problem. 
The real challenge is that with a consumer assigned multiple partitions, that 
consumer needs to check some offset progress for all partitions and then 
attempt to advance those that have not made progress--which may or may not be 
due to corrupted records. It's not clean/efficient/clear at all.

> IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
> -
>
> Key: KAFKA-4224
> URL: https://issues.apache.org/jira/browse/KAFKA-4224
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.1
>Reporter: Michael Coon
>
> For whatever reason, I seem to have a corrupted message that is returned from 
> a broker that puts the consumer into an infinite loop. The 
> org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the 
> next record from the RecordsIterator or MemoryRecords but when it attempts to 
> decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, 
> that exception is merely logged and the Fetcher goes on to get the next 
> message. But the exception apparently does not move the underlying buffer 
> read forward in such a way that it would actually go and get the next record. 
> The result: it keeps trying to read the corrupted record but can't make 
> progress. 
> I offer two potential solutions: 
> 1) throw the exception up to me and let me figure out whether I want to skip 
> forward in offsets
> 2) Make sure the underlying RecordsIterator actually moves forward on 
> exceptions so that progress can be made when corrupted messages are found.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient

2016-09-27 Thread Michael Coon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526189#comment-15526189
 ] 

Michael Coon commented on KAFKA-4224:
-

I think this catches the problem with IndexOutOfBounds, but would still yield a 
caught exception at the NetworkClient level. Basically, the challenge is that 
the exception is being caught in the NetworkClient and ultimately get back a 
ConsumerRecords with 0 count. I have no way of knowing that there was a 
problem, so I keep trying. Oddly, even though subsequent fetch requests should 
theoretically move the position in the partition forward, I keep getting the 
error over and over. I can keep moving the offset forward until success but 
that seems like it's going to introduce other problems or potentially skip 
records if things take too many attempts to come back.

> IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
> -
>
> Key: KAFKA-4224
> URL: https://issues.apache.org/jira/browse/KAFKA-4224
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.1
>Reporter: Michael Coon
>
> For whatever reason, I seem to have a corrupted message that is returned from 
> a broker that puts the consumer into an infinite loop. The 
> org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the 
> next record from the RecordsIterator or MemoryRecords but when it attempts to 
> decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, 
> that exception is merely logged and the Fetcher goes on to get the next 
> message. But the exception apparently does not move the underlying buffer 
> read forward in such a way that it would actually go and get the next record. 
> The result: it keeps trying to read the corrupted record but can't make 
> progress. 
> I offer two potential solutions: 
> 1) throw the exception up to me and let me figure out whether I want to skip 
> forward in offsets
> 2) Make sure the underlying RecordsIterator actually moves forward on 
> exceptions so that progress can be made when corrupted messages are found.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient

2016-09-27 Thread Michael Coon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15526080#comment-15526080
 ] 

Michael Coon commented on KAFKA-4224:
-

In doing more digging, it is actually a larger problem of not making progress 
on the consumer reading the current (corrupted) offset. The poll call 
eventually returns empty records but when I poll again, it merely attempts to 
pull back the same corrupted messages. I basically have to create a watchdog 
thread that detects lack of forward progress and skips offsets accordingly.

> IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
> -
>
> Key: KAFKA-4224
> URL: https://issues.apache.org/jira/browse/KAFKA-4224
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.1
>Reporter: Michael Coon
>
> For whatever reason, I seem to have a corrupted message that is returned from 
> a broker that puts the consumer into an infinite loop. The 
> org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the 
> next record from the RecordsIterator or MemoryRecords but when it attempts to 
> decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, 
> that exception is merely logged and the Fetcher goes on to get the next 
> message. But the exception apparently does not move the underlying buffer 
> read forward in such a way that it would actually go and get the next record. 
> The result: it keeps trying to read the corrupted record but can't make 
> progress. 
> I offer two potential solutions: 
> 1) throw the exception up to me and let me figure out whether I want to skip 
> forward in offsets
> 2) Make sure the underlying RecordsIterator actually moves forward on 
> exceptions so that progress can be made when corrupted messages are found.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient

2016-09-27 Thread Michael Coon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15525951#comment-15525951
 ] 

Michael Coon commented on KAFKA-4224:
-

I would love to give you a stack trace, but I unfortunately cannot provide a 
full stack trace due the system with the error being in an isolated environment.

> IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient
> -
>
> Key: KAFKA-4224
> URL: https://issues.apache.org/jira/browse/KAFKA-4224
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.1
>Reporter: Michael Coon
>
> For whatever reason, I seem to have a corrupted message that is returned from 
> a broker that puts the consumer into an infinite loop. The 
> org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the 
> next record from the RecordsIterator or MemoryRecords but when it attempts to 
> decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, 
> that exception is merely logged and the Fetcher goes on to get the next 
> message. But the exception apparently does not move the underlying buffer 
> read forward in such a way that it would actually go and get the next record. 
> The result: it keeps trying to read the corrupted record but can't make 
> progress. 
> I offer two potential solutions: 
> 1) throw the exception up to me and let me figure out whether I want to skip 
> forward in offsets
> 2) Make sure the underlying RecordsIterator actually moves forward on 
> exceptions so that progress can be made when corrupted messages are found.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4224) IndexOutOfBounds in RecordsIterator causes infinite loop in NetworkClient

2016-09-27 Thread Michael Coon (JIRA)
Michael Coon created KAFKA-4224:
---

 Summary: IndexOutOfBounds in RecordsIterator causes infinite loop 
in NetworkClient
 Key: KAFKA-4224
 URL: https://issues.apache.org/jira/browse/KAFKA-4224
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.10.0.1
Reporter: Michael Coon


For whatever reason, I seem to have a corrupted message that is returned from a 
broker that puts the consumer into an infinite loop. The 
org.apache.kafka.client.consumer.internals.Fetcher (line 590) is getting the 
next record from the RecordsIterator or MemoryRecords but when it attempts to 
decode the record, it throws "IndexOutOfBounds" exception. Unfortunately, that 
exception is merely logged and the Fetcher goes on to get the next message. But 
the exception apparently does not move the underlying buffer read forward in 
such a way that it would actually go and get the next record. The result: it 
keeps trying to read the corrupted record but can't make progress. 

I offer two potential solutions: 

1) throw the exception up to me and let me figure out whether I want to skip 
forward in offsets
2) Make sure the underlying RecordsIterator actually moves forward on 
exceptions so that progress can be made when corrupted messages are found.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3564) Count metric always increments by 1.0

2016-04-19 Thread Michael Coon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15248385#comment-15248385
 ] 

Michael Coon commented on KAFKA-3564:
-

If you look at the impl for Count, it only increments by 1.0 each time I call 
record (which calls update of Count from SampleStat). So it will only ever 
increment my count by 1  no matter what I send in as the actual value of 
record. For Total, you'll notice it overrides record and measure methods and 
does NOT reset it's total, ever. 

So if I want messages-in per second, I can't do new Rate(new Count()) because 
it will always only increment its total by 1.0. I have sub-elements of data 
coming into my processors that I want to use for the increment of count. I 
would have to iterate through all of them to increment count for each 
one...wasting cycles. Instead, I want to do something like 
"msgsIn.record(incoming.size())" and be done with it.

If I use new Rate(new Total()), then the total value never changes so the 
"rate" would just keep climbing and climbing forever and not give me the rate 
of messages-in in the last minute, for example.

> Count metric always increments by 1.0
> -
>
> Key: KAFKA-3564
> URL: https://issues.apache.org/jira/browse/KAFKA-3564
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Michael Coon
>Assignee: Kim Christensen
>
> The Count metric's update method always increments its value by 1.0 instead 
> of the value passed to it. If this is by design, it's misleading as I want to 
> be able to count based on values I send to the record method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3564) Count metric always increments by 1.0

2016-04-19 Thread Michael Coon (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247662#comment-15247662
 ] 

Michael Coon commented on KAFKA-3564:
-

Hmm, Sum wasn't available when I did a clone on the codebase. I'll have to 
update and see if that fits the bill. So what does count represent in metric 
terms then? If it were a kind of "instance" snapshot type of count--like "how 
many of X are in memory right now", cool; but there is no way to decrement the 
count so not sure how valuable it would be. Anyway, I'll check the sum metric 
out. Thanks!

> Count metric always increments by 1.0
> -
>
> Key: KAFKA-3564
> URL: https://issues.apache.org/jira/browse/KAFKA-3564
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Michael Coon
>Assignee: Kim Christensen
>
> The Count metric's update method always increments its value by 1.0 instead 
> of the value passed to it. If this is by design, it's misleading as I want to 
> be able to count based on values I send to the record method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3564) Count metric always increments by 1.0

2016-04-15 Thread Michael Coon (JIRA)
Michael Coon created KAFKA-3564:
---

 Summary: Count metric always increments by 1.0
 Key: KAFKA-3564
 URL: https://issues.apache.org/jira/browse/KAFKA-3564
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.0
Reporter: Michael Coon


The Count metric's update method always increments its value by 1.0 instead of 
the value passed to it. If this is by design, it's misleading as I want to be 
able to count based on values I send to the record method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3538) Abstract the creation/retrieval of Producer for stream sinks for unit testing

2016-04-11 Thread Michael Coon (JIRA)
Michael Coon created KAFKA-3538:
---

 Summary: Abstract the creation/retrieval of Producer for stream 
sinks for unit testing
 Key: KAFKA-3538
 URL: https://issues.apache.org/jira/browse/KAFKA-3538
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Michael Coon
Assignee: Guozhang Wang
Priority: Minor


The StreamThread creates producer/consumers directly as KafkaProducer and 
KafkaConsumer, thus eliminating my ability to unit test my streams code without 
having an active Kafka nearby. Could this be abstracted in a way that it relies 
on an optional ProducerProvider or ConsumerProvider implementation that could 
inject a mock producer/consumer for unit testing? We do this in all our kafka 
code for unit testing and if a provider is not offered at runtime, we create 
the concrete KafkaProdocer/Consumer components by default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3537) Provide access to low-level Metrics in ProcessorContext

2016-04-11 Thread Michael Coon (JIRA)
Michael Coon created KAFKA-3537:
---

 Summary: Provide access to low-level Metrics in ProcessorContext
 Key: KAFKA-3537
 URL: https://issues.apache.org/jira/browse/KAFKA-3537
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 0.9.0.1
Reporter: Michael Coon
Assignee: Guozhang Wang
Priority: Minor


It would be good to have access to the underlying Metrics component in 
StreamMetrics. StreamMetrics forces a naming convention for metrics that does 
not fit our use case for reporting. We need to be able to convert the stream 
metrics to our own metrics formatting and it's cumbersome to extract group/op 
names from pre-formatted strings the way they are setup in StreamMetricsImpl. 
If there were a "metrics()" method of StreamMetrics to give me the underlying 
Metrics object, I could register my own sensors/metrics as needed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3535) Add metrics ability for streams serde components

2016-04-11 Thread Michael Coon (JIRA)
Michael Coon created KAFKA-3535:
---

 Summary: Add metrics ability for streams serde components
 Key: KAFKA-3535
 URL: https://issues.apache.org/jira/browse/KAFKA-3535
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 0.9.0.1
Reporter: Michael Coon
Assignee: Guozhang Wang
Priority: Minor


Add the ability to record metrics in the serializer/deserializer components. As 
it stands, I cannot record latency/sensor metrics since the API does not 
provide the context at the serde levels. Exposing the ProcessorContext at this 
level may not be the solution; but perhaps change the configure method to take 
a different config or init context and make the StreamMetrics available in that 
context along with config information.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3534) Deserialize on demand when default time extractor used

2016-04-08 Thread Michael Coon (JIRA)
Michael Coon created KAFKA-3534:
---

 Summary: Deserialize on demand when default time extractor used
 Key: KAFKA-3534
 URL: https://issues.apache.org/jira/browse/KAFKA-3534
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 0.9.0.1
Reporter: Michael Coon
Assignee: Guozhang Wang
Priority: Minor


When records are added to the RecordQueue, they are deserialized at that time 
in order to extract the timestamp. But for some data flows where large messages 
are consumed (particularly compressed messages), this can result in large 
spikes in memory as all messages must be deserialized prior to processing (and 
getting out of memory). An optimization might be to only require 
deserialization at this stage if a non-default timestamp extractor is being 
used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)