[jira] [Commented] (KAFKA-3894) Log Cleaner thread crashes and never restarts

2016-06-23 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346583#comment-15346583
 ] 

The Data Lorax commented on KAFKA-3894:
---

[~ijuma] Not sure this is related. Our situation seems to be just a legitimate 
large number of records in the segment.

> Log Cleaner thread crashes and never restarts
> -
>
> Key: KAFKA-3894
> URL: https://issues.apache.org/jira/browse/KAFKA-3894
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.2, 0.9.0.1
> Environment: Oracle JDK 8
> Ubuntu Precise
>Reporter: Tim Carey-Smith
>  Labels: compaction
>
> The log-cleaner thread can crash if the number of keys in a topic grows to be 
> too large to fit into the dedupe buffer. 
> The result of this is a log line: 
> {quote}
> broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner 
> \[kafka-log-cleaner-thread-0\], Error due to  
> java.lang.IllegalArgumentException: requirement failed: 9750860 messages in 
> segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit 
> only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> {quote}
> As a result, the broker is left in a potentially dangerous situation where 
> cleaning of compacted topics is not running. 
> It is unclear if the broader strategy for the {{LogCleaner}} is the reason 
> for this upper bound, or if this is a value which must be tuned for each 
> specific use-case. 
> Of more immediate concern is the fact that the thread crash is not visible 
> via JMX or exposed as some form of service degradation. 
> Some short-term remediations we have made are:
> * increasing the size of the dedupe buffer
> * monitoring the log-cleaner threads inside the JVM



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3894) Log Cleaner thread crashes and never restarts

2016-06-23 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15346580#comment-15346580
 ] 

The Data Lorax commented on KAFKA-3894:
---

Yep, we've ran into the same issue.

Would be nice if the cleaner, at the very minimum, skipped the segments with 
large number of records.

> Log Cleaner thread crashes and never restarts
> -
>
> Key: KAFKA-3894
> URL: https://issues.apache.org/jira/browse/KAFKA-3894
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.2, 0.9.0.1
> Environment: Oracle JDK 8
> Ubuntu Precise
>Reporter: Tim Carey-Smith
>  Labels: compaction
>
> The log-cleaner thread can crash if the number of keys in a topic grows to be 
> too large to fit into the dedupe buffer. 
> The result of this is a log line: 
> {quote}
> broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner 
> \[kafka-log-cleaner-thread-0\], Error due to  
> java.lang.IllegalArgumentException: requirement failed: 9750860 messages in 
> segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit 
> only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> {quote}
> As a result, the broker is left in a potentially dangerous situation where 
> cleaning of compacted topics is not running. 
> It is unclear if the broader strategy for the {{LogCleaner}} is the reason 
> for this upper bound, or if this is a value which must be tuned for each 
> specific use-case. 
> Of more immediate concern is the fact that the thread crash is not visible 
> via JMX or exposed as some form of service degradation. 
> Some short-term remediations we have made are:
> * increasing the size of the dedupe buffer
> * monitoring the log-cleaner threads inside the JVM



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-03 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313773#comment-15313773
 ] 

The Data Lorax edited comment on KAFKA-3775 at 6/3/16 7:24 AM:
---

I wonder how such a design would maintain the level of resiliency currently 
offered? At the moment, in a running multi-process cluster, the other processes 
pick up the slack if one of them should fail. With the purposed design some 
partitions would remain with out a consumer. This seems like a fundamental 
switch away from Kafka's current model, and a risky one in IMHO.

Could you also elaborate on why settings such as 'max.poll.records' don't help 
stop your initial instance going pop? Maybe there are other alternative 
solutions here... 


was (Author: bigandy):
I wonder how surah a design would maintain the level of resiliency currently 
offered? At the moment, in a running multi-process cluster the other processes 
pick up the slack if one of them should fail. With the purposed design some 
partitions would remain with out a consumer. This seems like a fundamental 
switch away from Kafka's current model.

Could you also elaborate on why settings such as 'max.poll.records' don't help 
stop your initial instance going pop? Maybe there are other alternative 
solutions here... 

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-03 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313773#comment-15313773
 ] 

The Data Lorax commented on KAFKA-3775:
---

I wonder how surah a design would maintain the level of resiliency currently 
offered? At the moment, in a running multi-process cluster the other processes 
pick up the slack if one of them should fail. With the purposed design some 
partitions would remain with out a consumer. This seems like a fundamental 
switch away from Kafka's current model.

Could you also elaborate on why settings such as 'max.poll.records' don't help 
stop your initial instance going pop? Maybe there are other alternative 
solutions here... 

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed

2016-03-31 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219894#comment-15219894
 ] 

The Data Lorax commented on KAFKA-3456:
---

OK, I'm getting the feeling there is little appetite to change the way this 
works and people would rather stick with the known limitations of the metrics.  
That being the case I'm happy(ish) for this to be closed.  If on the other hand 
we'd prefer more correct metrics, then I'm happy to work on a PR...

> In-house KafkaMetric misreports metrics when periodically observed
> --
>
> Key: KAFKA-3456
> URL: https://issues.apache.org/jira/browse/KAFKA-3456
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: The Data Lorax
>Assignee: Neha Narkhede
>Priority: Minor
>
> The metrics captured by Kafka through the in-house {{SampledStat}} suffer 
> from misreporting metrics if observed in a periodic manner.
> Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
> sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, 
> to report this metric to some external system we might poll it every 60 
> seconds to observe the current value. Using a shorter period would, in the 
> case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the 
> case of a {{Count}}, would lead to double counting - so 60 seconds is the 
> only period at which we can poll the metrics if we are to report accurate 
> metrics.
> To demonstrate the issue consider the following somewhat extreme case:
> The {{Rate}}  is capturing data from a system which alternates between a 999 
> per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
> aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
> seconds the first sample within the {{Rate}} instance will have a rate of 999 
> per sec, and the second 1 per sec. 
> If we were to ask the metric for its value at this 60 second boundary it 
> would correctly report 500 per sec. However, if we asked it again 1 
> millisecond later it would report 1 per sec, as the first sample window has 
> been aged out. Depending on how retarded into the 60 sec period of the metric 
> our periodic poll of the metric was, we would observe a constant rate 
> somewhere in the range of 1 to 500 per second, most likely around the 250 
> mark. 
> Other metrics based off of the {{SampledStat}} type suffer from the same 
> issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will 
> report a constant count somewhere between 30 and 60, rather than the correct 
> 60.
> This can be seen in the following test code:
> {code:java}
> public class MetricsTest {
> private MetricConfig metricsConfig;
> @Before
> public void setUp() throws Exception {
> metricsConfig = new MetricConfig();
> }
> private long t(final int bucket) {
> return metricsConfig.timeWindowMs() * bucket;
> }
> @Test
> public void testHowRateDropsMetrics() throws Exception {
> Rate rate = new Rate();
> metricsConfig.samples(2);
> metricsConfig.timeWindow(30, TimeUnit.SECONDS);
> // First sample window from t0 -> (t1 -1), with rate 999 per second:
> for (long time = t(0); time != t(1); time += 1000) {
> rate.record(metricsConfig, 999, time);
> }
> // Second sample window from t1 -> (t2 -1), with rate 1 per second:
> for (long time = t(1); time != t(2); time += 1000) {
> rate.record(metricsConfig, 1, time);
> }
> // Measure at bucket boundary, (though same issue exists all periodic 
> measurements)
> final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0
> // Third sample window from t2 -> (t3 -1), with rate 999 per second:
> for (long time = t(2); time != t(3); time += 1000) {
> rate.record(metricsConfig, 999, time);
> }
> // Second sample window from t3 -> (t4 -1), with rate 1 per second:
> for (long time = t(3); time != t(4); time += 1000) {
> rate.record(metricsConfig, 1, time);
> }
> // Measure second pair of samples:
> final double m2 = rate.measure(metricsConfig, t(4));// m2 = 1.0
> assertEquals("Measurement of the rate over the first two samples", 
> 500.0, m1, 2.0);
> assertEquals("Measurement of the rate over the last two samples", 
> 500.0, m2, 2.0);
> }
> @Test
> public void testHowRateDropsMetricsWithRetardedObservations() throws 
> Exception {
> final long retardation = 1000;
> Rate rate = new Rate();
> metricsConfig.samples(2);
> 

[jira] [Commented] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed

2016-03-26 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213062#comment-15213062
 ] 

The Data Lorax commented on KAFKA-3456:
---

Yep, increasing the number of samples certainly helps reduce the error. 
Likewise,  with the n+1 model increasing the samples reduces the latency.

I think the issue is not around the variable time period so much, but the fact 
that any data captured in the current sample after the periodic observation is 
never reported, as it's evicted before the next observation.  Consider a Count 
metric with 2 samples within a 60s period, that is being incremented every 
second. If we observe the metric half way though a sample then the count will 
be 45, not 60, and it will continue to be 45 if we keep observing the metric 
every minute.

For me, I'd prefer correct delayed metrics instead of timely metrics I can't 
trust. And I'd prefer to be able to reduce the delay my using higher samples 
than being able to reduce, but not remove, the error.


> In-house KafkaMetric misreports metrics when periodically observed
> --
>
> Key: KAFKA-3456
> URL: https://issues.apache.org/jira/browse/KAFKA-3456
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: The Data Lorax
>Assignee: Neha Narkhede
>Priority: Minor
>
> The metrics captured by Kafka through the in-house {{SampledStat}} suffer 
> from misreporting metrics if observed in a periodic manner.
> Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
> sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, 
> to report this metric to some external system we might poll it every 60 
> seconds to observe the current value. Using a shorter period would, in the 
> case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the 
> case of a {{Count}}, would lead to double counting - so 60 seconds is the 
> only period at which we can poll the metrics if we are to report accurate 
> metrics.
> To demonstrate the issue consider the following somewhat extreme case:
> The {{Rate}}  is capturing data from a system which alternates between a 999 
> per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
> aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
> seconds the first sample within the {{Rate}} instance will have a rate of 999 
> per sec, and the second 1 per sec. 
> If we were to ask the metric for its value at this 60 second boundary it 
> would correctly report 500 per sec. However, if we asked it again 1 
> millisecond later it would report 1 per sec, as the first sample window has 
> been aged out. Depending on how retarded into the 60 sec period of the metric 
> our periodic poll of the metric was, we would observe a constant rate 
> somewhere in the range of 1 to 500 per second, most likely around the 250 
> mark. 
> Other metrics based off of the {{SampledStat}} type suffer from the same 
> issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will 
> report a constant count somewhere between 30 and 60, rather than the correct 
> 60.
> This can be seen in the following test code:
> {code:java}
> public class MetricsTest {
> private MetricConfig metricsConfig;
> @Before
> public void setUp() throws Exception {
> metricsConfig = new MetricConfig();
> }
> private long t(final int bucket) {
> return metricsConfig.timeWindowMs() * bucket;
> }
> @Test
> public void testHowRateDropsMetrics() throws Exception {
> Rate rate = new Rate();
> metricsConfig.samples(2);
> metricsConfig.timeWindow(30, TimeUnit.SECONDS);
> // First sample window from t0 -> (t1 -1), with rate 999 per second:
> for (long time = t(0); time != t(1); time += 1000) {
> rate.record(metricsConfig, 999, time);
> }
> // Second sample window from t1 -> (t2 -1), with rate 1 per second:
> for (long time = t(1); time != t(2); time += 1000) {
> rate.record(metricsConfig, 1, time);
> }
> // Measure at bucket boundary, (though same issue exists all periodic 
> measurements)
> final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0
> // Third sample window from t2 -> (t3 -1), with rate 999 per second:
> for (long time = t(2); time != t(3); time += 1000) {
> rate.record(metricsConfig, 999, time);
> }
> // Second sample window from t3 -> (t4 -1), with rate 1 per second:
> for (long time = t(3); time != t(4); time += 1000) {
> rate.record(metricsConfig, 1, time);
> }
> // 

[jira] [Commented] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed

2016-03-25 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15211669#comment-15211669
 ] 

The Data Lorax commented on KAFKA-3456:
---

I see your point here Jay. But surely in general we're not looking for the rate 
at the point of observation, but the rate over the monitored period, I.e. in 
this case 60 seconds. 

Also, if we switch to discussing Count instead of rate, then I think the issue 
may be more apparent. Count suffers from the same issue. Again, I see the aim 
of a Count metric to count over the entire 60s period. The aggregation of a 60s 
periodic observation of the count should get pretty close to true value. But 
with the current code the true value will likely be much higher, especially in 
the presence of a volatile input profile.

> In-house KafkaMetric misreports metrics when periodically observed
> --
>
> Key: KAFKA-3456
> URL: https://issues.apache.org/jira/browse/KAFKA-3456
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: The Data Lorax
>Assignee: Neha Narkhede
>Priority: Minor
>
> The metrics captured by Kafka through the in-house {{SampledStat}} suffer 
> from misreporting metrics if observed in a periodic manner.
> Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
> sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, 
> to report this metric to some external system we might poll it every 60 
> seconds to observe the current value. Using a shorter period would, in the 
> case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the 
> case of a {{Count}}, would lead to double counting - so 60 seconds is the 
> only period at which we can poll the metrics if we are to report accurate 
> metrics.
> To demonstrate the issue consider the following somewhat extreme case:
> The {{Rate}}  is capturing data from a system which alternates between a 999 
> per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
> aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
> seconds the first sample within the {{Rate}} instance will have a rate of 999 
> per sec, and the second 1 per sec. 
> If we were to ask the metric for its value at this 60 second boundary it 
> would correctly report 500 per sec. However, if we asked it again 1 
> millisecond later it would report 1 per sec, as the first sample window has 
> been aged out. Depending on how retarded into the 60 sec period of the metric 
> our periodic poll of the metric was, we would observe a constant rate 
> somewhere in the range of 1 to 500 per second, most likely around the 250 
> mark. 
> Other metrics based off of the {{SampledStat}} type suffer from the same 
> issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will 
> report a constant count somewhere between 30 and 60, rather than the correct 
> 60.
> This can be seen in the following test code:
> {code:java}
> public class MetricsTest {
> private MetricConfig metricsConfig;
> @Before
> public void setUp() throws Exception {
> metricsConfig = new MetricConfig();
> }
> private long t(final int bucket) {
> return metricsConfig.timeWindowMs() * bucket;
> }
> @Test
> public void testHowRateDropsMetrics() throws Exception {
> Rate rate = new Rate();
> metricsConfig.samples(2);
> metricsConfig.timeWindow(30, TimeUnit.SECONDS);
> // First sample window from t0 -> (t1 -1), with rate 999 per second:
> for (long time = t(0); time != t(1); time += 1000) {
> rate.record(metricsConfig, 999, time);
> }
> // Second sample window from t1 -> (t2 -1), with rate 1 per second:
> for (long time = t(1); time != t(2); time += 1000) {
> rate.record(metricsConfig, 1, time);
> }
> // Measure at bucket boundary, (though same issue exists all periodic 
> measurements)
> final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0
> // Third sample window from t2 -> (t3 -1), with rate 999 per second:
> for (long time = t(2); time != t(3); time += 1000) {
> rate.record(metricsConfig, 999, time);
> }
> // Second sample window from t3 -> (t4 -1), with rate 1 per second:
> for (long time = t(3); time != t(4); time += 1000) {
> rate.record(metricsConfig, 1, time);
> }
> // Measure second pair of samples:
> final double m2 = rate.measure(metricsConfig, t(4));// m2 = 1.0
> assertEquals("Measurement of the rate over the first two samples", 
> 500.0, m1, 2.0);
> 

[jira] [Commented] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed

2016-03-24 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15210060#comment-15210060
 ] 

The Data Lorax commented on KAFKA-3456:
---

[~aauradkar], I see you've made changes in this area - would be interested to 
hear your thoughts...

> In-house KafkaMetric misreports metrics when periodically observed
> --
>
> Key: KAFKA-3456
> URL: https://issues.apache.org/jira/browse/KAFKA-3456
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>Reporter: The Data Lorax
>Assignee: Neha Narkhede
>Priority: Minor
>
> The metrics captured by Kafka through the in-house {{SampledStat}} suffer 
> from misreporting metrics if observed in a periodic manner.
> Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
> sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, 
> to report this metric to some external system we might poll it every 60 
> seconds to observe the current value. Using a shorter period would, in the 
> case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the 
> case of a {{Count}}, would lead to double counting - so 60 seconds is the 
> only period at which we can poll the metrics if we are to report accurate 
> metrics.
> To demonstrate the issue consider the following somewhat extreme case:
> The {{Rate}}  is capturing data from a system which alternates between a 999 
> per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
> aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
> seconds the first sample within the {{Rate}} instance will have a rate of 999 
> per sec, and the second 1 per sec. 
> If we were to ask the metric for its value at this 60 second boundary it 
> would correctly report 500 per sec. However, if we asked it again 1 
> millisecond later it would report 1 per sec, as the first sample window has 
> been aged out. Depending on how retarded into the 60 sec period of the metric 
> our periodic poll of the metric was, we would observe a constant rate 
> somewhere in the range of 1 to 500 per second, most likely around the 250 
> mark. 
> Other metrics based off of the {{SampledStat}} type suffer from the same 
> issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will 
> report a constant count somewhere between 30 and 60, rather than the correct 
> 60.
> This can be seen in the following test code:
> {code:java}
> public class MetricsTest {
> private MetricConfig metricsConfig;
> @Before
> public void setUp() throws Exception {
> metricsConfig = new MetricConfig();
> }
> private long t(final int bucket) {
> return metricsConfig.timeWindowMs() * bucket;
> }
> @Test
> public void testHowRateDropsMetrics() throws Exception {
> Rate rate = new Rate();
> metricsConfig.samples(2);
> metricsConfig.timeWindow(30, TimeUnit.SECONDS);
> // First sample window from t0 -> (t1 -1), with rate 999 per second:
> for (long time = t(0); time != t(1); time += 1000) {
> rate.record(metricsConfig, 999, time);
> }
> // Second sample window from t1 -> (t2 -1), with rate 1 per second:
> for (long time = t(1); time != t(2); time += 1000) {
> rate.record(metricsConfig, 1, time);
> }
> // Measure at bucket boundary, (though same issue exists all periodic 
> measurements)
> final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0
> // Third sample window from t2 -> (t3 -1), with rate 999 per second:
> for (long time = t(2); time != t(3); time += 1000) {
> rate.record(metricsConfig, 999, time);
> }
> // Second sample window from t3 -> (t4 -1), with rate 1 per second:
> for (long time = t(3); time != t(4); time += 1000) {
> rate.record(metricsConfig, 1, time);
> }
> // Measure second pair of samples:
> final double m2 = rate.measure(metricsConfig, t(4));// m2 = 1.0
> assertEquals("Measurement of the rate over the first two samples", 
> 500.0, m1, 2.0);
> assertEquals("Measurement of the rate over the last two samples", 
> 500.0, m2, 2.0);
> }
> @Test
> public void testHowRateDropsMetricsWithRetardedObservations() throws 
> Exception {
> final long retardation = 1000;
> Rate rate = new Rate();
> metricsConfig.samples(2);
> metricsConfig.timeWindow(30, TimeUnit.SECONDS);
> // First sample window from t0 -> (t1 -1), with rate 999 per second:
> for (long time = t(0); time != t(1); time += 1000) {
> 

[jira] [Updated] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed

2016-03-24 Thread The Data Lorax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

The Data Lorax updated KAFKA-3456:
--
Description: 
The metrics captured by Kafka through the in-house {{SampledStat}} suffer from 
misreporting metrics if observed in a periodic manner.

Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, to 
report this metric to some external system we might poll it every 60 seconds to 
observe the current value. Using a shorter period would, in the case of a 
{{Rate}}, lead to smoothing of the plotted data, and worse, in the case of a 
{{Count}}, would lead to double counting - so 60 seconds is the only period at 
which we can poll the metrics if we are to report accurate metrics.

To demonstrate the issue consider the following somewhat extreme case:

The {{Rate}}  is capturing data from a system which alternates between a 999 
per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
seconds the first sample within the {{Rate}} instance will have a rate of 999 
per sec, and the second 1 per sec. 

If we were to ask the metric for its value at this 60 second boundary it would 
correctly report 500 per sec. However, if we asked it again 1 millisecond later 
it would report 1 per sec, as the first sample window has been aged out. 
Depending on how retarded into the 60 sec period of the metric our periodic 
poll of the metric was, we would observe a constant rate somewhere in the range 
of 1 to 500 per second, most likely around the 250 mark. 

Other metrics based off of the {{SampledStat}} type suffer from the same issue 
e.g. the {{Count}} metric, given a constant rate of 1 per second, will report a 
constant count somewhere between 30 and 60, rather than the correct 60.

This can be seen in the following test code:

{code:java}
public class MetricsTest {
private MetricConfig metricsConfig;

@Before
public void setUp() throws Exception {
metricsConfig = new MetricConfig();
}

private long t(final int bucket) {
return metricsConfig.timeWindowMs() * bucket;
}

@Test
public void testHowRateDropsMetrics() throws Exception {
Rate rate = new Rate();
metricsConfig.samples(2);
metricsConfig.timeWindow(30, TimeUnit.SECONDS);

// First sample window from t0 -> (t1 -1), with rate 999 per second:
for (long time = t(0); time != t(1); time += 1000) {
rate.record(metricsConfig, 999, time);
}

// Second sample window from t1 -> (t2 -1), with rate 1 per second:
for (long time = t(1); time != t(2); time += 1000) {
rate.record(metricsConfig, 1, time);
}

// Measure at bucket boundary, (though same issue exists all periodic 
measurements)
final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0

// Third sample window from t2 -> (t3 -1), with rate 999 per second:
for (long time = t(2); time != t(3); time += 1000) {
rate.record(metricsConfig, 999, time);
}

// Second sample window from t3 -> (t4 -1), with rate 1 per second:
for (long time = t(3); time != t(4); time += 1000) {
rate.record(metricsConfig, 1, time);
}

// Measure second pair of samples:
final double m2 = rate.measure(metricsConfig, t(4));// m2 = 1.0

assertEquals("Measurement of the rate over the first two samples", 
500.0, m1, 2.0);
assertEquals("Measurement of the rate over the last two samples", 
500.0, m2, 2.0);
}

@Test
public void testHowRateDropsMetricsWithRetardedObservations() throws 
Exception {
final long retardation = 1000;

Rate rate = new Rate();
metricsConfig.samples(2);
metricsConfig.timeWindow(30, TimeUnit.SECONDS);

// First sample window from t0 -> (t1 -1), with rate 999 per second:
for (long time = t(0); time != t(1); time += 1000) {
rate.record(metricsConfig, 999, time);
}

// Second sample window from t1 -> (t2 -1), with rate 1 per second:
for (long time = t(1); time != t(2); time += 1000) {
rate.record(metricsConfig, 1, time);
}

double m1 = 0.0;

// Third sample window from t2 -> (t3 -1), with rate 999 per second:
for (long time = t(2); time != t(3); time += 1000) {
rate.record(metricsConfig, 999, time);

if (time == t(2) + retardation) {
m1 = rate.measure(metricsConfig, time); // // m1 = 65.something
}
}

// Second sample window from t3 -> (t4 -1), with rate 1 per second:
for (long time = t(3); time != t(4); time += 1000) {

[jira] [Updated] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed

2016-03-24 Thread The Data Lorax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

The Data Lorax updated KAFKA-3456:
--
Description: 
The metrics captured by Kafka through the in-house {{SampledStat}} suffer from 
misreporting metrics if observed in a periodic manner.

Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, to 
report this metric to some external system we might poll it every 60 seconds to 
observe the current value. Using a shorter period would, in the case of a 
{{Rate}}, lead to smoothing of the plotted data, and worse, in the case of a 
{{Count}}, would lead to double counting - so 60 seconds is the only period at 
which we can poll the metrics if we are to report accurate metrics.

To demonstrate the issue consider the following somewhat extreme case:

The {{Rate}}  is capturing data from a system which alternates between a 999 
per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
seconds the first sample within the {{Rate}} instance will have a rate of 999 
per sec, and the second 1 per sec. 

If we were to ask the metric for its value at this 60 second boundary it would 
correctly report 500 per sec. However, if we asked it again 1 millisecond later 
it would report 1 per sec, as the first sample window has been aged out. 
Depending on how retarded into the 60 sec period of the metric our periodic 
poll of the metric was, we would observe a constant rate somewhere in the range 
of 1 to 500 per second, most likely around the 250 mark. 

Other metrics based off of the {{SampledStat}} type suffer from the same issue 
e.g. the {{Count}} metric, given a constant rate of 1 per second, will report a 
constant count somewhere between 30 and 60, rather than the correct 60.

This can be seen in the following test code:

{code:java}
public class MetricsTest {
private MetricConfig metricsConfig;

@Before
public void setUp() throws Exception {
metricsConfig = new MetricConfig();
}

private long t(final int bucket) {
return metricsConfig.timeWindowMs() * bucket;
}

@Test
public void testHowRateDropsMetrics() throws Exception {
Rate rate = new Rate();
metricsConfig.samples(2);
metricsConfig.timeWindow(30, TimeUnit.SECONDS);

// First sample window from t0 -> (t1 -1), with rate 999 per second:
for (long time = t(0); time != t(1); time += 1000) {
rate.record(metricsConfig, 999, time);
}

// Second sample window from t1 -> (t2 -1), with rate 1 per second:
for (long time = t(1); time != t(2); time += 1000) {
rate.record(metricsConfig, 1, time);
}

// Measure at bucket boundary, (though same issue exists all periodic 
measurements)
final double m1 = rate.measure(metricsConfig, t(2));// m1 = 1.0

// Third sample window from t2 -> (t3 -1), with rate 999 per second:
for (long time = t(2); time != t(3); time += 1000) {
rate.record(metricsConfig, 999, time);
}

// Second sample window from t3 -> (t4 -1), with rate 1 per second:
for (long time = t(3); time != t(4); time += 1000) {
rate.record(metricsConfig, 1, time);
}

// Measure second pair of samples:
final double m2 = rate.measure(metricsConfig, t(4));// m2 = 1.0

assertEquals("Measurement of the rate over the first two samples", 
500.0, m1, 2.0);
assertEquals("Measurement of the rate over the last two samples", 
500.0, m2, 2.0);
}

@Test
public void testHowRateDropsMetricsWithRetardedObservations() throws 
Exception {
final long retardation = 1000;

Rate rate = new Rate();
metricsConfig.samples(2);
metricsConfig.timeWindow(30, TimeUnit.SECONDS);

// First sample window from t0 -> (t1 -1), with rate 999 per second:
for (long time = t(0); time != t(1); time += 1000) {
rate.record(metricsConfig, 999, time);
}

// Second sample window from t1 -> (t2 -1), with rate 1 per second:
for (long time = t(1); time != t(2); time += 1000) {
rate.record(metricsConfig, 1, time);
}

double m1 = 0.0;

// Third sample window from t2 -> (t3 -1), with rate 999 per second:
for (long time = t(2); time != t(3); time += 1000) {
rate.record(metricsConfig, 999, time);

if (time == t(2) + retardation) {
m1 = rate.measure(metricsConfig, time); // // m1 = 65.something
}
}

// Second sample window from t3 -> (t4 -1), with rate 1 per second:
for (long time = t(3); time != t(4); time += 1000) {

[jira] [Created] (KAFKA-3456) In-house KafkaMetric misreports metrics when periodically observed

2016-03-24 Thread The Data Lorax (JIRA)
The Data Lorax created KAFKA-3456:
-

 Summary: In-house KafkaMetric misreports metrics when periodically 
observed
 Key: KAFKA-3456
 URL: https://issues.apache.org/jira/browse/KAFKA-3456
 Project: Kafka
  Issue Type: Bug
  Components: consumer, core, producer 
Affects Versions: 0.9.0.1, 0.9.0.0, 0.10.0.0
Reporter: The Data Lorax
Assignee: Neha Narkhede
Priority: Minor


The metrics captured by Kafka through the in-house {{SampledStat}} suffer from 
misreporting metrics if observed in a periodic manner.

Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, to 
report this metric to some external system we might poll it every 60 seconds to 
observe the current value. Using a shorter period would, in the case of a 
{{Rate}}, lead to smoothing of the plotted data, and worse, in the case of a 
{{Count}}, would lead to double counting - so 60 seconds is the only period at 
which we can poll the metrics if we are to report accurate metrics.

To demonstrate the issue consider the following somewhat extreme case:

The {{Rate}}  is capturing data from a system which alternates between a 999 
per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
seconds the first sample within the {{Rate}} instance will have a rate of 999 
per sec, and the second 1 per sec. 

If we were to as the metric for its value at this 60 second boundary it would 
correctly report 500 per sec. However, if we asked it again 1 millisecond later 
it would report 1 per sec, as the first sample window has been aged out. 
Depending on how retarded into the 60 sec period of the metric our periodic 
poll of the metric was, we would observe a constant rate somewhere the range of 
1 to 500 per second, most likely around the 250 mark. 

Other metrics based off of the {{SampledStat}} type suffer from the same issue 
e.g. the {{Count}} metric, given a constant rate of 1 per second, will report a 
constant count somewhere between 30 and 60, rather than the correct 60.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2686) unsubscribe() call leaves KafkaConsumer in invalid state for manual topic-partition assignment

2016-01-19 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15106501#comment-15106501
 ] 

The Data Lorax commented on KAFKA-2686:
---

Sorry for the late response - but thanks!

> unsubscribe() call leaves KafkaConsumer in invalid state for manual 
> topic-partition assignment
> --
>
> Key: KAFKA-2686
> URL: https://issues.apache.org/jira/browse/KAFKA-2686
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: The Data Lorax
>Assignee: Guozhang Wang
> Fix For: 0.9.0.0
>
>
> The bellow code snippet demonstrated the problem.
> Basically, the unsubscribe() call leaves the KafkaConsumer in a state that 
> means poll() will always return empty record sets, even if new 
> topic-partitions have been assigned that have messages pending.  This is 
> because unsubscribe() sets SubscriptionState.needsPartitionAssignment to 
> true, and assign() does not clear this flag. The only thing that clears this 
> flag is when the consumer handles the response from a JoinGroup request.
> {code}
> final KafkaConsumer consumer = new KafkaConsumer<>(props);
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1)));
> ConsumerRecords records = consumer.poll(100);// <- Works, 
> returning records
> consumer.unsubscribe();   // Puts consumer into invalid state.
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2)));
> records = consumer.poll(100);// <- Always returns empty record set.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1894) Avoid long or infinite blocking in the consumer

2015-12-02 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15035665#comment-15035665
 ] 

The Data Lorax commented on KAFKA-1894:
---

I'm running into this issue and struggling to find a way around it - if the 
Kafka cluster is unavailable the KafkaConsumer.poll() call can block 
indefinitely - and does not even enter an interruptible state, which means 
there is no way of recovering, short of thread.stop().

Would be good to move this into a more imminent release or at least have the 
thread enter an interruptible state within the loop.

> Avoid long or infinite blocking in the consumer
> ---
>
> Key: KAFKA-1894
> URL: https://issues.apache.org/jira/browse/KAFKA-1894
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>Assignee: Jason Gustafson
> Fix For: 0.10.0.0
>
>
> The new consumer has a lot of loops that look something like
> {code}
>   while(!isThingComplete())
> client.poll();
> {code}
> This occurs both in KafkaConsumer but also in NetworkClient.completeAll. 
> These retry loops are actually mostly the behavior we want but there are 
> several cases where they may cause problems:
>  - In the case of a hard failure we may hang for a long time or indefinitely 
> before realizing the connection is lost.
>  - In the case where the cluster is malfunctioning or down we may retry 
> forever.
> It would probably be better to give a timeout to these. The proposed approach 
> would be to add something like retry.time.ms=6 and only continue retrying 
> for that period of time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2901) Extend ListGroups and DescribeGroup APIs to cover offsets

2015-11-27 Thread The Data Lorax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

The Data Lorax updated KAFKA-2901:
--
Description: 
The {{ListGroupsRequest}} and {{DescribeGroupsRequest}} added to 0.9.0.0 allow 
admin tools to get details of consumer groups now that this information is not 
stored in ZK.

The brokers also now store offset information for consumer groups. At present, 
there is no API for admin tools to discover the groups that brokers are storing 
offsets for.

For example, if a consumer is using the new consumer api, is storing offsets in 
Kafka under the groupId 'Bob', but is using manual partition assignment, then 
the {{groupCache}} in the {{GroupMetadataManager}} will not contain any 
information about the group 'Bob'. However, the {{offsetCache}} in the 
{{GroupMetadataManager}} will contain information about 'Bob'.

Currently the only way for admin tools to know the full set of groups being 
managed by Kafka, i.e. those storing offsets in Kafka, those using Kafka for 
balancing of consumer groups, and those using Kafka for both, is to consume the 
offset topic.

We need to extend the List/Describe groups API to allow admin tools to discover 
'Bob' and allow the partition offsets to be retrieved.

  was:
The {{ListGroupsRequest}} and {{DescribeGroupsRequest}} added to 0.9.0.0 allow 
admin tools to get details of consumer groups now that this information is not 
stored in ZK.

The brokers also now store offset information for consumer groups. At present, 
there is no API for admin tools to discover the groups that brokers are storing 
offsets for.

For example, if a consumer is using the new consumer api, is storing offsets in 
Kafka under the groupId 'Bob', but is using manual partition assignment, then 
the {{groupCache}} in the {{GroupMetadataManager}} will not contain any 
information about the group 'Bob'. However, the {{offsetCache}} in the 
{{GroupMetadataManager}} will contain information about 'Bob'.

We need to extend the List/Describe groups API to allow admin tools to discover 
'Bob' and allow the partition offsets to be retrieved.


> Extend ListGroups and DescribeGroup APIs to cover offsets
> -
>
> Key: KAFKA-2901
> URL: https://issues.apache.org/jira/browse/KAFKA-2901
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: The Data Lorax
>Assignee: Neha Narkhede
> Fix For: 0.9.0.1
>
>
> The {{ListGroupsRequest}} and {{DescribeGroupsRequest}} added to 0.9.0.0 
> allow admin tools to get details of consumer groups now that this information 
> is not stored in ZK.
> The brokers also now store offset information for consumer groups. At 
> present, there is no API for admin tools to discover the groups that brokers 
> are storing offsets for.
> For example, if a consumer is using the new consumer api, is storing offsets 
> in Kafka under the groupId 'Bob', but is using manual partition assignment, 
> then the {{groupCache}} in the {{GroupMetadataManager}} will not contain any 
> information about the group 'Bob'. However, the {{offsetCache}} in the 
> {{GroupMetadataManager}} will contain information about 'Bob'.
> Currently the only way for admin tools to know the full set of groups being 
> managed by Kafka, i.e. those storing offsets in Kafka, those using Kafka for 
> balancing of consumer groups, and those using Kafka for both, is to consume 
> the offset topic.
> We need to extend the List/Describe groups API to allow admin tools to 
> discover 'Bob' and allow the partition offsets to be retrieved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2901) Extend ListGroups and DescribeGroup APIs to cover offsets

2015-11-27 Thread The Data Lorax (JIRA)
The Data Lorax created KAFKA-2901:
-

 Summary: Extend ListGroups and DescribeGroup APIs to cover offsets
 Key: KAFKA-2901
 URL: https://issues.apache.org/jira/browse/KAFKA-2901
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Affects Versions: 0.9.0.0
Reporter: The Data Lorax
Assignee: Neha Narkhede
 Fix For: 0.9.0.1


The {{ListGroupsRequest}} and {{DescribeGroupsRequest}} added to 0.9.0.0 allow 
admin tools to get details of consumer groups now that this information is not 
stored in ZK.

The brokers also now store offset information for consumer groups. At present, 
there is no API for admin tools to discover the groups that brokers are storing 
offsets for.

For example, if a consumer is using the new consumer api, is storing offsets in 
Kafka under the groupId 'Bob', but is using manual partition assignment, then 
the {{groupCache}} in the {{GroupMetadataManager}} will not contain any 
information about the group 'Bob'. However, the {{offsetCache}} in the 
{{GroupMetadataManager}} will contain information about 'Bob'.

We need to extend the List/Describe groups API to allow admin tools to discover 
'Bob' and allow the partition offsets to be retrieved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2686) unsubscribe() call leaves KafkaConsumer in invalid state for manual topic-partition assignment

2015-10-22 Thread The Data Lorax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

The Data Lorax updated KAFKA-2686:
--
Description: 
The bellow code snippet demonstrated the problem.

Basically, the unsubscribe() call leaves the KafkaConsumer in a state that 
means poll() will always return empty record sets, even if new topic-partitions 
have been assigned that have messages pending.  This is because unsubscribe() 
sets SubscriptionState.needsPartitionAssignment to true, and assign() does not 
clear this flag. The only thing that clears this flag is when the consumer 
handles the response from a JoinGroup request.

{code}
final KafkaConsumer consumer = new KafkaConsumer<>(props);

consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1)));
ConsumerRecords records = consumer.poll(100);// <- Works, 
returning records

consumer.unsubscribe();   // Puts consumer into invalid state.

consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2)));
records = consumer.poll(100);// <- Always returns empty record set.
{code}



> unsubscribe() call leaves KafkaConsumer in invalid state for manual 
> topic-partition assignment
> --
>
> Key: KAFKA-2686
> URL: https://issues.apache.org/jira/browse/KAFKA-2686
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0
>Reporter: The Data Lorax
>Assignee: Neha Narkhede
>
> The bellow code snippet demonstrated the problem.
> Basically, the unsubscribe() call leaves the KafkaConsumer in a state that 
> means poll() will always return empty record sets, even if new 
> topic-partitions have been assigned that have messages pending.  This is 
> because unsubscribe() sets SubscriptionState.needsPartitionAssignment to 
> true, and assign() does not clear this flag. The only thing that clears this 
> flag is when the consumer handles the response from a JoinGroup request.
> {code}
> final KafkaConsumer consumer = new KafkaConsumer<>(props);
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 1)));
> ConsumerRecords records = consumer.poll(100);// <- Works, 
> returning records
> consumer.unsubscribe();   // Puts consumer into invalid state.
> consumer.assign(Collections.singletonList(new TopicPartition(topicName, 2)));
> records = consumer.poll(100);// <- Always returns empty record set.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2686) unsubscribe() call leaves KafkaConsumer in invalid state for manual topic-partition assignment

2015-10-22 Thread The Data Lorax (JIRA)
The Data Lorax created KAFKA-2686:
-

 Summary: unsubscribe() call leaves KafkaConsumer in invalid state 
for manual topic-partition assignment
 Key: KAFKA-2686
 URL: https://issues.apache.org/jira/browse/KAFKA-2686
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.9.0.0
Reporter: The Data Lorax
Assignee: Neha Narkhede






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)