[jira] [Commented] (KAFKA-3894) Log Cleaner thread crashes and never restarts
[ https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346583#comment-15346583 ] The Data Lorax commented on KAFKA-3894: --- [~ijuma] Not sure this is related. Our situation seems to be just a legitimate large number of records in the segment. > Log Cleaner thread crashes and never restarts > - > > Key: KAFKA-3894 > URL: https://issues.apache.org/jira/browse/KAFKA-3894 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.2, 0.9.0.1 > Environment: Oracle JDK 8 > Ubuntu Precise >Reporter: Tim Carey-Smith > Labels: compaction > > The log-cleaner thread can crash if the number of keys in a topic grows to be > too large to fit into the dedupe buffer. > The result of this is a log line: > {quote} > broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner > \[kafka-log-cleaner-thread-0\], Error due to > java.lang.IllegalArgumentException: requirement failed: 9750860 messages in > segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit > only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease > log.cleaner.threads > {quote} > As a result, the broker is left in a potentially dangerous situation where > cleaning of compacted topics is not running. > It is unclear if the broader strategy for the {{LogCleaner}} is the reason > for this upper bound, or if this is a value which must be tuned for each > specific use-case. > Of more immediate concern is the fact that the thread crash is not visible > via JMX or exposed as some form of service degradation. > Some short-term remediations we have made are: > * increasing the size of the dedupe buffer > * monitoring the log-cleaner threads inside the JVM -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3894) Log Cleaner thread crashes and never restarts
[ https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346580#comment-15346580 ] The Data Lorax commented on KAFKA-3894: --- Yep, we've ran into the same issue. Would be nice if the cleaner, at the very minimum, skipped the segments with large number of records. > Log Cleaner thread crashes and never restarts > - > > Key: KAFKA-3894 > URL: https://issues.apache.org/jira/browse/KAFKA-3894 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.2.2, 0.9.0.1 > Environment: Oracle JDK 8 > Ubuntu Precise >Reporter: Tim Carey-Smith > Labels: compaction > > The log-cleaner thread can crash if the number of keys in a topic grows to be > too large to fit into the dedupe buffer. > The result of this is a log line: > {quote} > broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner > \[kafka-log-cleaner-thread-0\], Error due to > java.lang.IllegalArgumentException: requirement failed: 9750860 messages in > segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit > only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease > log.cleaner.threads > {quote} > As a result, the broker is left in a potentially dangerous situation where > cleaning of compacted topics is not running. > It is unclear if the broader strategy for the {{LogCleaner}} is the reason > for this upper bound, or if this is a value which must be tuned for each > specific use-case. > Of more immediate concern is the fact that the thread crash is not visible > via JMX or exposed as some form of service degradation. > Some short-term remediations we have made are: > * increasing the size of the dedupe buffer > * monitoring the log-cleaner threads inside the JVM -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313773#comment-15313773 ] The Data Lorax edited comment on KAFKA-3775 at 6/3/16 7:24 AM: --- I wonder how such a design would maintain the level of resiliency currently offered? At the moment, in a running multi-process cluster, the other processes pick up the slack if one of them should fail. With the purposed design some partitions would remain with out a consumer. This seems like a fundamental switch away from Kafka's current model, and a risky one in IMHO. Could you also elaborate on why settings such as 'max.poll.records' don't help stop your initial instance going pop? Maybe there are other alternative solutions here... was (Author: bigandy): I wonder how surah a design would maintain the level of resiliency currently offered? At the moment, in a running multi-process cluster the other processes pick up the slack if one of them should fail. With the purposed design some partitions would remain with out a consumer. This seems like a fundamental switch away from Kafka's current model. Could you also elaborate on why settings such as 'max.poll.records' don't help stop your initial instance going pop? Maybe there are other alternative solutions here... > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313773#comment-15313773 ] The Data Lorax commented on KAFKA-3775: --- I wonder how surah a design would maintain the level of resiliency currently offered? At the moment, in a running multi-process cluster the other processes pick up the slack if one of them should fail. With the purposed design some partitions would remain with out a consumer. This seems like a fundamental switch away from Kafka's current model. Could you also elaborate on why settings such as 'max.poll.records' don't help stop your initial instance going pop? Maybe there are other alternative solutions here... > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed
[ https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219894#comment-15219894 ] The Data Lorax commented on KAFKA-3456: --- OK, I'm getting the feeling there is little appetite to change the way this works and people would rather stick with the known limitations of the metrics. That being the case I'm happy(ish) for this to be closed. If on the other hand we'd prefer more correct metrics, then I'm happy to work on a PR... > In-house KafkaMetric misreports metrics when periodically observed > -- > > Key: KAFKA-3456 > URL: https://issues.apache.org/jira/browse/KAFKA-3456 > Project: Kafka > Issue Type: Bug > Components: consumer, core, producer >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 >Reporter: The Data Lorax >Assignee: Neha Narkhede >Priority: Minor > > The metrics captured by Kafka through the in-house {{SampledStat}} suffer > from misreporting metrics if observed in a periodic manner. > Consider a {{Rate}} metric that is using the default 2 samples and 30 second > sample window i.e. the {{Rate}} is capturing 60 seconds worth of data. So, > to report this metric to some external system we might poll it every 60 > seconds to observe the current value. Using a shorter period would, in the > case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the > case of a {{Count}}, would lead to double counting - so 60 seconds is the > only period at which we can poll the metrics if we are to report accurate > metrics. > To demonstrate the issue consider the following somewhat extreme case: > The {{Rate}} is capturing data from a system which alternates between a 999 > per sec rate and a 1 per sec rate every 30 seconds, with the different rates > aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 > seconds the first sample within the {{Rate}} instance will have a rate of 999 > per sec, and the second 1 per sec. > If we were to ask the metric for its value at this 60 second boundary it > would correctly report 500 per sec. However, if we asked it again 1 > millisecond later it would report 1 per sec, as the first sample window has > been aged out. Depending on how retarded into the 60 sec period of the metric > our periodic poll of the metric was, we would observe a constant rate > somewhere in the range of 1 to 500 per second, most likely around the 250 > mark. > Other metrics based off of the {{SampledStat}} type suffer from the same > issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will > report a constant count somewhere between 30 and 60, rather than the correct > 60. > This can be seen in the following test code: > {code:java} > public class MetricsTest { > private MetricConfig metricsConfig; > @Before > public void setUp() throws Exception { > metricsConfig = new MetricConfig(); > } > private long t(final int bucket) { > return metricsConfig.timeWindowMs() * bucket; > } > @Test > public void testHowRateDropsMetrics() throws Exception { > Rate rate = new Rate(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // First sample window from t0 -> (t1 -1), with rate 999 per second: > for (long time = t(0); time != t(1); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t1 -> (t2 -1), with rate 1 per second: > for (long time = t(1); time != t(2); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure at bucket boundary, (though same issue exists all periodic > measurements) > final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0 > // Third sample window from t2 -> (t3 -1), with rate 999 per second: > for (long time = t(2); time != t(3); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t3 -> (t4 -1), with rate 1 per second: > for (long time = t(3); time != t(4); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure second pair of samples: > final double m2 = rate.measure(metricsConfig, t(4));// m2 = 1.0 > assertEquals("Measurement of the rate over the first two samples", > 500.0, m1, 2.0); > assertEquals("Measurement of the rate over the last two samples", > 500.0, m2, 2.0); > } > @Test > public void testHowRateDropsMetricsWithRetardedObservations() throws > Exception { > final long retardation = 1000; > Rate rate = new Rate(); > metricsConfig.samples(2); >
[jira] [Commented] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed
[ https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213062#comment-15213062 ] The Data Lorax commented on KAFKA-3456: --- Yep, increasing the number of samples certainly helps reduce the error. Likewise, with the n+1 model increasing the samples reduces the latency. I think the issue is not around the variable time period so much, but the fact that any data captured in the current sample after the periodic observation is never reported, as it's evicted before the next observation. Consider a Count metric with 2 samples within a 60s period, that is being incremented every second. If we observe the metric half way though a sample then the count will be 45, not 60, and it will continue to be 45 if we keep observing the metric every minute. For me, I'd prefer correct delayed metrics instead of timely metrics I can't trust. And I'd prefer to be able to reduce the delay my using higher samples than being able to reduce, but not remove, the error. > In-house KafkaMetric misreports metrics when periodically observed > -- > > Key: KAFKA-3456 > URL: https://issues.apache.org/jira/browse/KAFKA-3456 > Project: Kafka > Issue Type: Bug > Components: consumer, core, producer >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 >Reporter: The Data Lorax >Assignee: Neha Narkhede >Priority: Minor > > The metrics captured by Kafka through the in-house {{SampledStat}} suffer > from misreporting metrics if observed in a periodic manner. > Consider a {{Rate}} metric that is using the default 2 samples and 30 second > sample window i.e. the {{Rate}} is capturing 60 seconds worth of data. So, > to report this metric to some external system we might poll it every 60 > seconds to observe the current value. Using a shorter period would, in the > case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the > case of a {{Count}}, would lead to double counting - so 60 seconds is the > only period at which we can poll the metrics if we are to report accurate > metrics. > To demonstrate the issue consider the following somewhat extreme case: > The {{Rate}} is capturing data from a system which alternates between a 999 > per sec rate and a 1 per sec rate every 30 seconds, with the different rates > aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 > seconds the first sample within the {{Rate}} instance will have a rate of 999 > per sec, and the second 1 per sec. > If we were to ask the metric for its value at this 60 second boundary it > would correctly report 500 per sec. However, if we asked it again 1 > millisecond later it would report 1 per sec, as the first sample window has > been aged out. Depending on how retarded into the 60 sec period of the metric > our periodic poll of the metric was, we would observe a constant rate > somewhere in the range of 1 to 500 per second, most likely around the 250 > mark. > Other metrics based off of the {{SampledStat}} type suffer from the same > issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will > report a constant count somewhere between 30 and 60, rather than the correct > 60. > This can be seen in the following test code: > {code:java} > public class MetricsTest { > private MetricConfig metricsConfig; > @Before > public void setUp() throws Exception { > metricsConfig = new MetricConfig(); > } > private long t(final int bucket) { > return metricsConfig.timeWindowMs() * bucket; > } > @Test > public void testHowRateDropsMetrics() throws Exception { > Rate rate = new Rate(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // First sample window from t0 -> (t1 -1), with rate 999 per second: > for (long time = t(0); time != t(1); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t1 -> (t2 -1), with rate 1 per second: > for (long time = t(1); time != t(2); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure at bucket boundary, (though same issue exists all periodic > measurements) > final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0 > // Third sample window from t2 -> (t3 -1), with rate 999 per second: > for (long time = t(2); time != t(3); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t3 -> (t4 -1), with rate 1 per second: > for (long time = t(3); time != t(4); time += 1000) { > rate.record(metricsConfig, 1, time); > } > //
[jira] [Commented] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed
[ https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211669#comment-15211669 ] The Data Lorax commented on KAFKA-3456: --- I see your point here Jay. But surely in general we're not looking for the rate at the point of observation, but the rate over the monitored period, I.e. in this case 60 seconds. Also, if we switch to discussing Count instead of rate, then I think the issue may be more apparent. Count suffers from the same issue. Again, I see the aim of a Count metric to count over the entire 60s period. The aggregation of a 60s periodic observation of the count should get pretty close to true value. But with the current code the true value will likely be much higher, especially in the presence of a volatile input profile. > In-house KafkaMetric misreports metrics when periodically observed > -- > > Key: KAFKA-3456 > URL: https://issues.apache.org/jira/browse/KAFKA-3456 > Project: Kafka > Issue Type: Bug > Components: consumer, core, producer >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 >Reporter: The Data Lorax >Assignee: Neha Narkhede >Priority: Minor > > The metrics captured by Kafka through the in-house {{SampledStat}} suffer > from misreporting metrics if observed in a periodic manner. > Consider a {{Rate}} metric that is using the default 2 samples and 30 second > sample window i.e. the {{Rate}} is capturing 60 seconds worth of data. So, > to report this metric to some external system we might poll it every 60 > seconds to observe the current value. Using a shorter period would, in the > case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the > case of a {{Count}}, would lead to double counting - so 60 seconds is the > only period at which we can poll the metrics if we are to report accurate > metrics. > To demonstrate the issue consider the following somewhat extreme case: > The {{Rate}} is capturing data from a system which alternates between a 999 > per sec rate and a 1 per sec rate every 30 seconds, with the different rates > aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 > seconds the first sample within the {{Rate}} instance will have a rate of 999 > per sec, and the second 1 per sec. > If we were to ask the metric for its value at this 60 second boundary it > would correctly report 500 per sec. However, if we asked it again 1 > millisecond later it would report 1 per sec, as the first sample window has > been aged out. Depending on how retarded into the 60 sec period of the metric > our periodic poll of the metric was, we would observe a constant rate > somewhere in the range of 1 to 500 per second, most likely around the 250 > mark. > Other metrics based off of the {{SampledStat}} type suffer from the same > issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will > report a constant count somewhere between 30 and 60, rather than the correct > 60. > This can be seen in the following test code: > {code:java} > public class MetricsTest { > private MetricConfig metricsConfig; > @Before > public void setUp() throws Exception { > metricsConfig = new MetricConfig(); > } > private long t(final int bucket) { > return metricsConfig.timeWindowMs() * bucket; > } > @Test > public void testHowRateDropsMetrics() throws Exception { > Rate rate = new Rate(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // First sample window from t0 -> (t1 -1), with rate 999 per second: > for (long time = t(0); time != t(1); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t1 -> (t2 -1), with rate 1 per second: > for (long time = t(1); time != t(2); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure at bucket boundary, (though same issue exists all periodic > measurements) > final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0 > // Third sample window from t2 -> (t3 -1), with rate 999 per second: > for (long time = t(2); time != t(3); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t3 -> (t4 -1), with rate 1 per second: > for (long time = t(3); time != t(4); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure second pair of samples: > final double m2 = rate.measure(metricsConfig, t(4));// m2 = 1.0 > assertEquals("Measurement of the rate over the first two samples", > 500.0, m1, 2.0); >
[jira] [Commented] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed
[ https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210060#comment-15210060 ] The Data Lorax commented on KAFKA-3456: --- [~aauradkar], I see you've made changes in this area - would be interested to hear your thoughts... > In-house KafkaMetric misreports metrics when periodically observed > -- > > Key: KAFKA-3456 > URL: https://issues.apache.org/jira/browse/KAFKA-3456 > Project: Kafka > Issue Type: Bug > Components: consumer, core, producer >Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0 >Reporter: The Data Lorax >Assignee: Neha Narkhede >Priority: Minor > > The metrics captured by Kafka through the in-house {{SampledStat}} suffer > from misreporting metrics if observed in a periodic manner. > Consider a {{Rate}} metric that is using the default 2 samples and 30 second > sample window i.e. the {{Rate}} is capturing 60 seconds worth of data. So, > to report this metric to some external system we might poll it every 60 > seconds to observe the current value. Using a shorter period would, in the > case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the > case of a {{Count}}, would lead to double counting - so 60 seconds is the > only period at which we can poll the metrics if we are to report accurate > metrics. > To demonstrate the issue consider the following somewhat extreme case: > The {{Rate}} is capturing data from a system which alternates between a 999 > per sec rate and a 1 per sec rate every 30 seconds, with the different rates > aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 > seconds the first sample within the {{Rate}} instance will have a rate of 999 > per sec, and the second 1 per sec. > If we were to ask the metric for its value at this 60 second boundary it > would correctly report 500 per sec. However, if we asked it again 1 > millisecond later it would report 1 per sec, as the first sample window has > been aged out. Depending on how retarded into the 60 sec period of the metric > our periodic poll of the metric was, we would observe a constant rate > somewhere in the range of 1 to 500 per second, most likely around the 250 > mark. > Other metrics based off of the {{SampledStat}} type suffer from the same > issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will > report a constant count somewhere between 30 and 60, rather than the correct > 60. > This can be seen in the following test code: > {code:java} > public class MetricsTest { > private MetricConfig metricsConfig; > @Before > public void setUp() throws Exception { > metricsConfig = new MetricConfig(); > } > private long t(final int bucket) { > return metricsConfig.timeWindowMs() * bucket; > } > @Test > public void testHowRateDropsMetrics() throws Exception { > Rate rate = new Rate(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // First sample window from t0 -> (t1 -1), with rate 999 per second: > for (long time = t(0); time != t(1); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t1 -> (t2 -1), with rate 1 per second: > for (long time = t(1); time != t(2); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure at bucket boundary, (though same issue exists all periodic > measurements) > final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0 > // Third sample window from t2 -> (t3 -1), with rate 999 per second: > for (long time = t(2); time != t(3); time += 1000) { > rate.record(metricsConfig, 999, time); > } > // Second sample window from t3 -> (t4 -1), with rate 1 per second: > for (long time = t(3); time != t(4); time += 1000) { > rate.record(metricsConfig, 1, time); > } > // Measure second pair of samples: > final double m2 = rate.measure(metricsConfig, t(4));// m2 = 1.0 > assertEquals("Measurement of the rate over the first two samples", > 500.0, m1, 2.0); > assertEquals("Measurement of the rate over the last two samples", > 500.0, m2, 2.0); > } > @Test > public void testHowRateDropsMetricsWithRetardedObservations() throws > Exception { > final long retardation = 1000; > Rate rate = new Rate(); > metricsConfig.samples(2); > metricsConfig.timeWindow(30, TimeUnit.SECONDS); > // First sample window from t0 -> (t1 -1), with rate 999 per second: > for (long time = t(0); time != t(1); time += 1000) { >
[jira] [Updated] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed
[ https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] The Data Lorax updated KAFKA-3456: -- Description: The metrics captured by Kafka through the in-house {{SampledStat}} suffer from misreporting metrics if observed in a periodic manner. Consider a {{Rate}} metric that is using the default 2 samples and 30 second sample window i.e. the {{Rate}} is capturing 60 seconds worth of data. So, to report this metric to some external system we might poll it every 60 seconds to observe the current value. Using a shorter period would, in the case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the case of a {{Count}}, would lead to double counting - so 60 seconds is the only period at which we can poll the metrics if we are to report accurate metrics. To demonstrate the issue consider the following somewhat extreme case: The {{Rate}} is capturing data from a system which alternates between a 999 per sec rate and a 1 per sec rate every 30 seconds, with the different rates aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 seconds the first sample within the {{Rate}} instance will have a rate of 999 per sec, and the second 1 per sec. If we were to ask the metric for its value at this 60 second boundary it would correctly report 500 per sec. However, if we asked it again 1 millisecond later it would report 1 per sec, as the first sample window has been aged out. Depending on how retarded into the 60 sec period of the metric our periodic poll of the metric was, we would observe a constant rate somewhere in the range of 1 to 500 per second, most likely around the 250 mark. Other metrics based off of the {{SampledStat}} type suffer from the same issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will report a constant count somewhere between 30 and 60, rather than the correct 60. This can be seen in the following test code: {code:java} public class MetricsTest { private MetricConfig metricsConfig; @Before public void setUp() throws Exception { metricsConfig = new MetricConfig(); } private long t(final int bucket) { return metricsConfig.timeWindowMs() * bucket; } @Test public void testHowRateDropsMetrics() throws Exception { Rate rate = new Rate(); metricsConfig.samples(2); metricsConfig.timeWindow(30, TimeUnit.SECONDS); // First sample window from t0 -> (t1 -1), with rate 999 per second: for (long time = t(0); time != t(1); time += 1000) { rate.record(metricsConfig, 999, time); } // Second sample window from t1 -> (t2 -1), with rate 1 per second: for (long time = t(1); time != t(2); time += 1000) { rate.record(metricsConfig, 1, time); } // Measure at bucket boundary, (though same issue exists all periodic measurements) final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0 // Third sample window from t2 -> (t3 -1), with rate 999 per second: for (long time = t(2); time != t(3); time += 1000) { rate.record(metricsConfig, 999, time); } // Second sample window from t3 -> (t4 -1), with rate 1 per second: for (long time = t(3); time != t(4); time += 1000) { rate.record(metricsConfig, 1, time); } // Measure second pair of samples: final double m2 = rate.measure(metricsConfig, t(4));// m2 = 1.0 assertEquals("Measurement of the rate over the first two samples", 500.0, m1, 2.0); assertEquals("Measurement of the rate over the last two samples", 500.0, m2, 2.0); } @Test public void testHowRateDropsMetricsWithRetardedObservations() throws Exception { final long retardation = 1000; Rate rate = new Rate(); metricsConfig.samples(2); metricsConfig.timeWindow(30, TimeUnit.SECONDS); // First sample window from t0 -> (t1 -1), with rate 999 per second: for (long time = t(0); time != t(1); time += 1000) { rate.record(metricsConfig, 999, time); } // Second sample window from t1 -> (t2 -1), with rate 1 per second: for (long time = t(1); time != t(2); time += 1000) { rate.record(metricsConfig, 1, time); } double m1 = 0.0; // Third sample window from t2 -> (t3 -1), with rate 999 per second: for (long time = t(2); time != t(3); time += 1000) { rate.record(metricsConfig, 999, time); if (time == t(2) + retardation) { m1 = rate.measure(metricsConfig, time); // // m1 = 65.something } } // Second sample window from t3 -> (t4 -1), with rate 1 per second: for (long time = t(3); time != t(4); time += 1000) {
[jira] [Updated] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed
[ https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] The Data Lorax updated KAFKA-3456: -- Description: The metrics captured by Kafka through the in-house {{SampledStat}} suffer from misreporting metrics if observed in a periodic manner. Consider a {{Rate}} metric that is using the default 2 samples and 30 second sample window i.e. the {{Rate}} is capturing 60 seconds worth of data. So, to report this metric to some external system we might poll it every 60 seconds to observe the current value. Using a shorter period would, in the case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the case of a {{Count}}, would lead to double counting - so 60 seconds is the only period at which we can poll the metrics if we are to report accurate metrics. To demonstrate the issue consider the following somewhat extreme case: The {{Rate}} is capturing data from a system which alternates between a 999 per sec rate and a 1 per sec rate every 30 seconds, with the different rates aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 seconds the first sample within the {{Rate}} instance will have a rate of 999 per sec, and the second 1 per sec. If we were to ask the metric for its value at this 60 second boundary it would correctly report 500 per sec. However, if we asked it again 1 millisecond later it would report 1 per sec, as the first sample window has been aged out. Depending on how retarded into the 60 sec period of the metric our periodic poll of the metric was, we would observe a constant rate somewhere in the range of 1 to 500 per second, most likely around the 250 mark. Other metrics based off of the {{SampledStat}} type suffer from the same issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will report a constant count somewhere between 30 and 60, rather than the correct 60. This can be seen in the following test code: {code:java} public class MetricsTest { private MetricConfig metricsConfig; @Before public void setUp() throws Exception { metricsConfig = new MetricConfig(); } private long t(final int bucket) { return metricsConfig.timeWindowMs() * bucket; } @Test public void testHowRateDropsMetrics() throws Exception { Rate rate = new Rate(); metricsConfig.samples(2); metricsConfig.timeWindow(30, TimeUnit.SECONDS); // First sample window from t0 -> (t1 -1), with rate 999 per second: for (long time = t(0); time != t(1); time += 1000) { rate.record(metricsConfig, 999, time); } // Second sample window from t1 -> (t2 -1), with rate 1 per second: for (long time = t(1); time != t(2); time += 1000) { rate.record(metricsConfig, 1, time); } // Measure at bucket boundary, (though same issue exists all periodic measurements) final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0 // Third sample window from t2 -> (t3 -1), with rate 999 per second: for (long time = t(2); time != t(3); time += 1000) { rate.record(metricsConfig, 999, time); } // Second sample window from t3 -> (t4 -1), with rate 1 per second: for (long time = t(3); time != t(4); time += 1000) { rate.record(metricsConfig, 1, time); } // Measure second pair of samples: final double m2 = rate.measure(metricsConfig, t(4));// m2 = 1.0 assertEquals("Measurement of the rate over the first two samples", 500.0, m1, 2.0); assertEquals("Measurement of the rate over the last two samples", 500.0, m2, 2.0); } @Test public void testHowRateDropsMetricsWithRetardedObservations() throws Exception { final long retardation = 1000; Rate rate = new Rate(); metricsConfig.samples(2); metricsConfig.timeWindow(30, TimeUnit.SECONDS); // First sample window from t0 -> (t1 -1), with rate 999 per second: for (long time = t(0); time != t(1); time += 1000) { rate.record(metricsConfig, 999, time); } // Second sample window from t1 -> (t2 -1), with rate 1 per second: for (long time = t(1); time != t(2); time += 1000) { rate.record(metricsConfig, 1, time); } double m1 = 0.0; // Third sample window from t2 -> (t3 -1), with rate 999 per second: for (long time = t(2); time != t(3); time += 1000) { rate.record(metricsConfig, 999, time); if (time == t(2) + retardation) { m1 = rate.measure(metricsConfig, time); // // m1 = 65.something } } // Second sample window from t3 -> (t4 -1), with rate 1 per second: for (long time = t(3); time != t(4); time += 1000) {
[jira] [Created] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed
The Data Lorax created KAFKA-3456: - Summary: In-house KafkaMetric misreports metrics when periodically observed Key: KAFKA-3456 URL: https://issues.apache.org/jira/browse/KAFKA-3456 Project: Kafka Issue Type: Bug Components: consumer, core, producer Affects Versions: 0.9.0.1, 0.9.0.0, 0.10.0.0 Reporter: The Data Lorax Assignee: Neha Narkhede Priority: Minor The metrics captured by Kafka through the in-house {{SampledStat}} suffer from misreporting metrics if observed in a periodic manner. Consider a {{Rate}} metric that is using the default 2 samples and 30 second sample window i.e. the {{Rate}} is capturing 60 seconds worth of data. So, to report this metric to some external system we might poll it every 60 seconds to observe the current value. Using a shorter period would, in the case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the case of a {{Count}}, would lead to double counting - so 60 seconds is the only period at which we can poll the metrics if we are to report accurate metrics. To demonstrate the issue consider the following somewhat extreme case: The {{Rate}} is capturing data from a system which alternates between a 999 per sec rate and a 1 per sec rate every 30 seconds, with the different rates aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 seconds the first sample within the {{Rate}} instance will have a rate of 999 per sec, and the second 1 per sec. If we were to as the metric for its value at this 60 second boundary it would correctly report 500 per sec. However, if we asked it again 1 millisecond later it would report 1 per sec, as the first sample window has been aged out. Depending on how retarded into the 60 sec period of the metric our periodic poll of the metric was, we would observe a constant rate somewhere the range of 1 to 500 per second, most likely around the 250 mark. Other metrics based off of the {{SampledStat}} type suffer from the same issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will report a constant count somewhere between 30 and 60, rather than the correct 60. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2686) unsubscribe() call leaves KafkaConsumer in invalid state for manual topic-partition assignment
[ https://issues.apache.org/jira/browse/KAFKA-2686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15106501#comment-15106501 ] The Data Lorax commented on KAFKA-2686: --- Sorry for the late response - but thanks! > unsubscribe() call leaves KafkaConsumer in invalid state for manual > topic-partition assignment > -- > > Key: KAFKA-2686 > URL: https://issues.apache.org/jira/browse/KAFKA-2686 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: The Data Lorax >Assignee: Guozhang Wang > Fix For: 0.9.0.0 > > > The bellow code snippet demonstrated the problem. > Basically, the unsubscribe() call leaves the KafkaConsumer in a state that > means poll() will always return empty record sets, even if new > topic-partitions have been assigned that have messages pending. This is > because unsubscribe() sets SubscriptionState.needsPartitionAssignment to > true, and assign() does not clear this flag. The only thing that clears this > flag is when the consumer handles the response from a JoinGroup request. > {code} > final KafkaConsumerconsumer = new KafkaConsumer<>(props); > consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1))); > ConsumerRecords records = consumer.poll(100);// <- Works, > returning records > consumer.unsubscribe(); // Puts consumer into invalid state. > consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2))); > records = consumer.poll(100);// <- Always returns empty record set. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer
[ https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035665#comment-15035665 ] The Data Lorax commented on KAFKA-1894: --- I'm running into this issue and struggling to find a way around it - if the Kafka cluster is unavailable the KafkaConsumer.poll() call can block indefinitely - and does not even enter an interruptible state, which means there is no way of recovering, short of thread.stop(). Would be good to move this into a more imminent release or at least have the thread enter an interruptible state within the loop. > Avoid long or infinite blocking in the consumer > --- > > Key: KAFKA-1894 > URL: https://issues.apache.org/jira/browse/KAFKA-1894 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Jay Kreps >Assignee: Jason Gustafson > Fix For: 0.10.0.0 > > > The new consumer has a lot of loops that look something like > {code} > while(!isThingComplete()) > client.poll(); > {code} > This occurs both in KafkaConsumer but also in NetworkClient.completeAll. > These retry loops are actually mostly the behavior we want but there are > several cases where they may cause problems: > - In the case of a hard failure we may hang for a long time or indefinitely > before realizing the connection is lost. > - In the case where the cluster is malfunctioning or down we may retry > forever. > It would probably be better to give a timeout to these. The proposed approach > would be to add something like retry.time.ms=6 and only continue retrying > for that period of time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2901) Extend ListGroups and DescribeGroup APIs to cover offsets
[ https://issues.apache.org/jira/browse/KAFKA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] The Data Lorax updated KAFKA-2901: -- Description: The {{ListGroupsRequest}} and {{DescribeGroupsRequest}} added to 0.9.0.0 allow admin tools to get details of consumer groups now that this information is not stored in ZK. The brokers also now store offset information for consumer groups. At present, there is no API for admin tools to discover the groups that brokers are storing offsets for. For example, if a consumer is using the new consumer api, is storing offsets in Kafka under the groupId 'Bob', but is using manual partition assignment, then the {{groupCache}} in the {{GroupMetadataManager}} will not contain any information about the group 'Bob'. However, the {{offsetCache}} in the {{GroupMetadataManager}} will contain information about 'Bob'. Currently the only way for admin tools to know the full set of groups being managed by Kafka, i.e. those storing offsets in Kafka, those using Kafka for balancing of consumer groups, and those using Kafka for both, is to consume the offset topic. We need to extend the List/Describe groups API to allow admin tools to discover 'Bob' and allow the partition offsets to be retrieved. was: The {{ListGroupsRequest}} and {{DescribeGroupsRequest}} added to 0.9.0.0 allow admin tools to get details of consumer groups now that this information is not stored in ZK. The brokers also now store offset information for consumer groups. At present, there is no API for admin tools to discover the groups that brokers are storing offsets for. For example, if a consumer is using the new consumer api, is storing offsets in Kafka under the groupId 'Bob', but is using manual partition assignment, then the {{groupCache}} in the {{GroupMetadataManager}} will not contain any information about the group 'Bob'. However, the {{offsetCache}} in the {{GroupMetadataManager}} will contain information about 'Bob'. We need to extend the List/Describe groups API to allow admin tools to discover 'Bob' and allow the partition offsets to be retrieved. > Extend ListGroups and DescribeGroup APIs to cover offsets > - > > Key: KAFKA-2901 > URL: https://issues.apache.org/jira/browse/KAFKA-2901 > Project: Kafka > Issue Type: Improvement > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: The Data Lorax >Assignee: Neha Narkhede > Fix For: 0.9.0.1 > > > The {{ListGroupsRequest}} and {{DescribeGroupsRequest}} added to 0.9.0.0 > allow admin tools to get details of consumer groups now that this information > is not stored in ZK. > The brokers also now store offset information for consumer groups. At > present, there is no API for admin tools to discover the groups that brokers > are storing offsets for. > For example, if a consumer is using the new consumer api, is storing offsets > in Kafka under the groupId 'Bob', but is using manual partition assignment, > then the {{groupCache}} in the {{GroupMetadataManager}} will not contain any > information about the group 'Bob'. However, the {{offsetCache}} in the > {{GroupMetadataManager}} will contain information about 'Bob'. > Currently the only way for admin tools to know the full set of groups being > managed by Kafka, i.e. those storing offsets in Kafka, those using Kafka for > balancing of consumer groups, and those using Kafka for both, is to consume > the offset topic. > We need to extend the List/Describe groups API to allow admin tools to > discover 'Bob' and allow the partition offsets to be retrieved. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2901) Extend ListGroups and DescribeGroup APIs to cover offsets
The Data Lorax created KAFKA-2901: - Summary: Extend ListGroups and DescribeGroup APIs to cover offsets Key: KAFKA-2901 URL: https://issues.apache.org/jira/browse/KAFKA-2901 Project: Kafka Issue Type: Improvement Components: consumer Affects Versions: 0.9.0.0 Reporter: The Data Lorax Assignee: Neha Narkhede Fix For: 0.9.0.1 The {{ListGroupsRequest}} and {{DescribeGroupsRequest}} added to 0.9.0.0 allow admin tools to get details of consumer groups now that this information is not stored in ZK. The brokers also now store offset information for consumer groups. At present, there is no API for admin tools to discover the groups that brokers are storing offsets for. For example, if a consumer is using the new consumer api, is storing offsets in Kafka under the groupId 'Bob', but is using manual partition assignment, then the {{groupCache}} in the {{GroupMetadataManager}} will not contain any information about the group 'Bob'. However, the {{offsetCache}} in the {{GroupMetadataManager}} will contain information about 'Bob'. We need to extend the List/Describe groups API to allow admin tools to discover 'Bob' and allow the partition offsets to be retrieved. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2686) unsubscribe() call leaves KafkaConsumer in invalid state for manual topic-partition assignment
[ https://issues.apache.org/jira/browse/KAFKA-2686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] The Data Lorax updated KAFKA-2686: -- Description: The bellow code snippet demonstrated the problem. Basically, the unsubscribe() call leaves the KafkaConsumer in a state that means poll() will always return empty record sets, even if new topic-partitions have been assigned that have messages pending. This is because unsubscribe() sets SubscriptionState.needsPartitionAssignment to true, and assign() does not clear this flag. The only thing that clears this flag is when the consumer handles the response from a JoinGroup request. {code} final KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1))); ConsumerRecords records = consumer.poll(100);// <- Works, returning records consumer.unsubscribe(); // Puts consumer into invalid state. consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2))); records = consumer.poll(100);// <- Always returns empty record set. {code} > unsubscribe() call leaves KafkaConsumer in invalid state for manual > topic-partition assignment > -- > > Key: KAFKA-2686 > URL: https://issues.apache.org/jira/browse/KAFKA-2686 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.9.0.0 >Reporter: The Data Lorax >Assignee: Neha Narkhede > > The bellow code snippet demonstrated the problem. > Basically, the unsubscribe() call leaves the KafkaConsumer in a state that > means poll() will always return empty record sets, even if new > topic-partitions have been assigned that have messages pending. This is > because unsubscribe() sets SubscriptionState.needsPartitionAssignment to > true, and assign() does not clear this flag. The only thing that clears this > flag is when the consumer handles the response from a JoinGroup request. > {code} > final KafkaConsumer consumer = new KafkaConsumer<>(props); > consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1))); > ConsumerRecords records = consumer.poll(100);// <- Works, > returning records > consumer.unsubscribe(); // Puts consumer into invalid state. > consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2))); > records = consumer.poll(100);// <- Always returns empty record set. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2686) unsubscribe() call leaves KafkaConsumer in invalid state for manual topic-partition assignment
The Data Lorax created KAFKA-2686: - Summary: unsubscribe() call leaves KafkaConsumer in invalid state for manual topic-partition assignment Key: KAFKA-2686 URL: https://issues.apache.org/jira/browse/KAFKA-2686 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.9.0.0 Reporter: The Data Lorax Assignee: Neha Narkhede -- This message was sent by Atlassian JIRA (v6.3.4#6332)