[jira] [Resolved] (KAFKA-9538) Flaky Test `testResetOffsetsExportImportPlan`

2020-02-12 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9538.

Resolution: Fixed

> Flaky Test `testResetOffsetsExportImportPlan`
> -
>
> Key: KAFKA-9538
> URL: https://issues.apache.org/jira/browse/KAFKA-9538
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
>
> {code}
> 19:44:41 
> 19:44:41 kafka.admin.ResetConsumerGroupOffsetTest > 
> testResetOffsetsExportImportPlan FAILED
> 19:44:41 java.lang.AssertionError: expected: 2, bar2-1 -> 
> 2)> but was:
> 19:44:41 at org.junit.Assert.fail(Assert.java:89)
> 19:44:41 at org.junit.Assert.failNotEquals(Assert.java:835)
> 19:44:41 at org.junit.Assert.assertEquals(Assert.java:120)
> 19:44:41 at org.junit.Assert.assertEquals(Assert.java:146)
> 19:44:41 at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan(ResetConsumerGroupOffsetTest.scala:429)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9538) Flaky Test `testResetOffsetsExportImportPlan`

2020-02-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035959#comment-17035959
 ] 

ASF GitHub Bot commented on KAFKA-9538:
---

hachikuji commented on pull request #6561: KAFKA-9538: Flaky test: 
testResetOffsetsExportImportPlan
URL: https://github.com/apache/kafka/pull/6561
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test `testResetOffsetsExportImportPlan`
> -
>
> Key: KAFKA-9538
> URL: https://issues.apache.org/jira/browse/KAFKA-9538
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
>
> {code}
> 19:44:41 
> 19:44:41 kafka.admin.ResetConsumerGroupOffsetTest > 
> testResetOffsetsExportImportPlan FAILED
> 19:44:41 java.lang.AssertionError: expected: 2, bar2-1 -> 
> 2)> but was:
> 19:44:41 at org.junit.Assert.fail(Assert.java:89)
> 19:44:41 at org.junit.Assert.failNotEquals(Assert.java:835)
> 19:44:41 at org.junit.Assert.assertEquals(Assert.java:120)
> 19:44:41 at org.junit.Assert.assertEquals(Assert.java:146)
> 19:44:41 at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan(ResetConsumerGroupOffsetTest.scala:429)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean

2020-02-12 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035930#comment-17035930
 ] 

Ismael Juma commented on KAFKA-9504:


I believe this was fixed in the 2.4 branch (and later).

> Memory leak in KafkaMetrics registered to MBean
> ---
>
> Key: KAFKA-9504
> URL: https://issues.apache.org/jira/browse/KAFKA-9504
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: Andreas Holmén
>Priority: Major
>
> After close() called on a KafkaConsumer some registered MBeans are not 
> unregistered causing leak.
>  
>  
> {code:java}
> import static 
> org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
> import java.lang.management.ManagementFactory;
> import java.util.HashMap;
> import java.util.Map;
> import javax.management.MBeanServer;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> public class Leaker {
>  private static String bootstrapServers = "hostname:9092";
>  
>  public static void main(String[] args) throws InterruptedException {
>   MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
>   Map props = new HashMap<>();
>   props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
>  
>   int beans = mBeanServer.getMBeanCount();
>   for (int i = 0; i < 100; i++) {
>KafkaConsumer consumer = new KafkaConsumer<>(props, new 
> ByteArrayDeserializer(), new ByteArrayDeserializer());
>consumer.close();
>   }
>   int newBeans = mBeanServer.getMBeanCount();
>   System.out.println("\nbeans delta: " + (newBeans - beans));
>  }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean

2020-02-12 Thread Junyong Li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035929#comment-17035929
 ] 

Junyong Li edited comment on KAFKA-9504 at 2/13/20 5:31 AM:


Maybe I found the cause:
 KafkaConsumerMetrics and SelectorMetrics use the same metricGrpName, which is 
"consumer-metrics", and SelectorMetrics will call metrics.removeMetric in its 
close (). Because KafkaConsumerMetrics does not delete the metric items added 
during initialization, metrics.removeMetric will eventually cause the execution 
of JmxReporter. java: 101, re-register the deleted MBean named 
"consumer-metrics".

Maybe we can modify the metricGrpName of SelectorMetrics to 
"consumer-selector-metrics", or adjust the closing order of components in close 
() in KafkaConsumer, and put Utils.closeQuietly (metrics ... behind 
AppInfoParser.unregisterAppInfo (...


was (Author: lijy83):
Maybe I found the cause:
 KafkaConsumerMetrics and SelectorMetrics use the same metricGrpName, which is 
"consumer-metrics", and SelectorMetrics will call metrics.removeMetric in its 
close (). Because KafkaConsumerMetrics does not delete the metric items added 
during initialization, SelectorMetrics's close() will eventually cause the 
execution of JmxReporter. java: 101, re-register the deleted MBean named 
"consumer-metrics".

Maybe we can modify the metricGrpName of SelectorMetrics to 
"consumer-selector-metrics", or adjust the closing order of components in close 
() in KafkaConsumer, and put Utils.closeQuietly (metrics ... behind 
AppInfoParser.unregisterAppInfo (...

> Memory leak in KafkaMetrics registered to MBean
> ---
>
> Key: KAFKA-9504
> URL: https://issues.apache.org/jira/browse/KAFKA-9504
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: Andreas Holmén
>Priority: Major
>
> After close() called on a KafkaConsumer some registered MBeans are not 
> unregistered causing leak.
>  
>  
> {code:java}
> import static 
> org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
> import java.lang.management.ManagementFactory;
> import java.util.HashMap;
> import java.util.Map;
> import javax.management.MBeanServer;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> public class Leaker {
>  private static String bootstrapServers = "hostname:9092";
>  
>  public static void main(String[] args) throws InterruptedException {
>   MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
>   Map props = new HashMap<>();
>   props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
>  
>   int beans = mBeanServer.getMBeanCount();
>   for (int i = 0; i < 100; i++) {
>KafkaConsumer consumer = new KafkaConsumer<>(props, new 
> ByteArrayDeserializer(), new ByteArrayDeserializer());
>consumer.close();
>   }
>   int newBeans = mBeanServer.getMBeanCount();
>   System.out.println("\nbeans delta: " + (newBeans - beans));
>  }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean

2020-02-12 Thread Junyong Li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035929#comment-17035929
 ] 

Junyong Li edited comment on KAFKA-9504 at 2/13/20 5:29 AM:


Maybe I found the cause:
 KafkaConsumerMetrics and SelectorMetrics use the same metricGrpName, which is 
"consumer-metrics", and SelectorMetrics will call metrics.removeMetric in its 
close (). Because KafkaConsumerMetrics does not delete the metric items added 
during initialization, SelectorMetrics's close() will eventually cause the 
execution of JmxReporter. java: 101, re-register the deleted MBean named 
"consumer-metrics".

Maybe we can modify the metricGrpName of SelectorMetrics to 
"consumer-selector-metrics", or adjust the closing order of components in close 
() in KafkaConsumer, and put Utils.closeQuietly (metrics ... behind 
AppInfoParser.unregisterAppInfo (...


was (Author: lijy83):
Maybe I found the cause:
KafkaConsumerMetrics and SelectorMetrics use the same metricGrpName, which is 
"consumer-metrics", and SelectorMetrics will call metrics.removeMetric in its 
close (). Because KafkaConsumerMetrics does not delete the metric items added 
during initialization, it will eventually cause the execution of JmxReporter. 
java: 101, re-register the deleted MBean named "consumer-metrics".

Maybe we can modify the metricGrpName of SelectorMetrics to 
"consumer-selector-metrics", or adjust the closing order of components in close 
() in KafkaConsumer, and put Utils.closeQuietly (metrics ... behind 
AppInfoParser.unregisterAppInfo (...

> Memory leak in KafkaMetrics registered to MBean
> ---
>
> Key: KAFKA-9504
> URL: https://issues.apache.org/jira/browse/KAFKA-9504
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: Andreas Holmén
>Priority: Major
>
> After close() called on a KafkaConsumer some registered MBeans are not 
> unregistered causing leak.
>  
>  
> {code:java}
> import static 
> org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
> import java.lang.management.ManagementFactory;
> import java.util.HashMap;
> import java.util.Map;
> import javax.management.MBeanServer;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> public class Leaker {
>  private static String bootstrapServers = "hostname:9092";
>  
>  public static void main(String[] args) throws InterruptedException {
>   MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
>   Map props = new HashMap<>();
>   props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
>  
>   int beans = mBeanServer.getMBeanCount();
>   for (int i = 0; i < 100; i++) {
>KafkaConsumer consumer = new KafkaConsumer<>(props, new 
> ByteArrayDeserializer(), new ByteArrayDeserializer());
>consumer.close();
>   }
>   int newBeans = mBeanServer.getMBeanCount();
>   System.out.println("\nbeans delta: " + (newBeans - beans));
>  }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean

2020-02-12 Thread Junyong Li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035929#comment-17035929
 ] 

Junyong Li commented on KAFKA-9504:
---

Maybe I found the cause:
KafkaConsumerMetrics and SelectorMetrics use the same metricGrpName, which is 
"consumer-metrics", and SelectorMetrics will call metrics.removeMetric in its 
close (). Because KafkaConsumerMetrics does not delete the metric items added 
during initialization, it will eventually cause the execution of JmxReporter. 
java: 101, re-register the deleted MBean named "consumer-metrics".

Maybe we can modify the metricGrpName of SelectorMetrics to 
"consumer-selector-metrics", or adjust the closing order of components in close 
() in KafkaConsumer, and put Utils.closeQuietly (metrics ... behind 
AppInfoParser.unregisterAppInfo (...

> Memory leak in KafkaMetrics registered to MBean
> ---
>
> Key: KAFKA-9504
> URL: https://issues.apache.org/jira/browse/KAFKA-9504
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: Andreas Holmén
>Priority: Major
>
> After close() called on a KafkaConsumer some registered MBeans are not 
> unregistered causing leak.
>  
>  
> {code:java}
> import static 
> org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
> import java.lang.management.ManagementFactory;
> import java.util.HashMap;
> import java.util.Map;
> import javax.management.MBeanServer;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> public class Leaker {
>  private static String bootstrapServers = "hostname:9092";
>  
>  public static void main(String[] args) throws InterruptedException {
>   MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
>   Map props = new HashMap<>();
>   props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
>  
>   int beans = mBeanServer.getMBeanCount();
>   for (int i = 0; i < 100; i++) {
>KafkaConsumer consumer = new KafkaConsumer<>(props, new 
> ByteArrayDeserializer(), new ByteArrayDeserializer());
>consumer.close();
>   }
>   int newBeans = mBeanServer.getMBeanCount();
>   System.out.println("\nbeans delta: " + (newBeans - beans));
>  }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-12 Thread Michael Viamari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035927#comment-17035927
 ] 

Michael Viamari commented on KAFKA-9533:


Sure. I can take a look at it. Before I get started:

1) How should I think about the case where someone might unintentionally (or 
intentionally) be relying on the buggy behavior? Is that something I should 
handle in code, or is it handled elsewhere?

2) I noticed that KStream#transform uses an adaptor to conform to 
KStream#flatTransform. Should I do something similar for 
KStream#transformValues and KStream#flatTransformValues, since I will be 
modifying that area of code already?

> ValueTransform forwards `null` values
> -
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Viamari
>Priority: Minor
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2020-02-12 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-9545:
--

Assignee: Boyang Chen

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Boyang Chen
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size

2020-02-12 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035922#comment-17035922
 ] 

zhangzhisheng commented on KAFKA-6834:
--

kafka2.12_0.11.0.3 log clean tread stoped,throw exception info e.g
{code:java}
// code placeholder
java.lang.IllegalStateException: This log contains a message larger than 
maximum allowable size of 100.
{code}
 

> log cleaner should handle the case when the size of a message set is larger 
> than the max message size
> -
>
> Key: KAFKA-6834
> URL: https://issues.apache.org/jira/browse/KAFKA-6834
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.0.0
>
>
> In KAFKA-5316, we added the logic to allow a message (set) larger than the 
> per topic message size to be written to the log during log cleaning. However, 
> the buffer size in the log cleaner is still bounded by the per topic message 
> size. This can cause the log cleaner to die and cause the broker to run out 
> of disk space.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9441) Refactor commit logic

2020-02-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035875#comment-17035875
 ] 

ASF GitHub Bot commented on KAFKA-9441:
---

mjsax commented on pull request #8105: KAFKA-9441: Add internal StreamsProducer
URL: https://github.com/apache/kafka/pull/8105
 
 
   Upfront refactoring for KIP-447.
   
   Introduces `StreamsProducer` that allows to share a producer over multiple 
tasks and track the TX status.
   
   Call for review @guozhangwang @abbccdda 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor commit logic
> -
>
> Key: KAFKA-9441
> URL: https://issues.apache.org/jira/browse/KAFKA-9441
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> Using producer per thread in combination with EOS, it's not possible any 
> longer to commit individual task independently (as done currently).
> We need to refactor StreamsThread, to commit all tasks at the same time for 
> the new model.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9541) Flaky Test DescribeConsumerGroupTest#testDescribeGroupMembersWithShortInitializationTimeout

2020-02-12 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035028#comment-17035028
 ] 

huxihx edited comment on KAFKA-9541 at 2/13/20 2:14 AM:


Occasionally the captured exception is DisconnectException instead of 
TimeoutException. That might be due to an unexpected long pause that caused the 
node disconnection.


was (Author: huxi_2b):
Occasionally the captured exception is DisconnectedException instead of 
TimeoutException. That might be due to an unexpected long pause that caused the 
node disconnection.

> Flaky Test 
> DescribeConsumerGroupTest#testDescribeGroupMembersWithShortInitializationTimeout
> ---
>
> Key: KAFKA-9541
> URL: https://issues.apache.org/jira/browse/KAFKA-9541
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.4.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Major
>
> h3. Error Message
> java.lang.AssertionError: assertion failed
> h3. Stacktrace
> java.lang.AssertionError: assertion failed at 
> scala.Predef$.assert(Predef.scala:267) at 
> kafka.admin.DescribeConsumerGroupTest.testDescribeGroupMembersWithShortInitializationTimeout(DescribeConsumerGroupTest.scala:630)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>  at jdk.internal.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>  at jdk.internal.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at 
> 

[jira] [Commented] (KAFKA-7061) Enhanced log compaction

2020-02-12 Thread Senthilnathan Muthusamy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035829#comment-17035829
 ] 

Senthilnathan Muthusamy commented on KAFKA-7061:


Opened PR - rebased with the latest apache/trunk

[https://github.com/apache/kafka/pull/8103]

> Enhanced log compaction
> ---
>
> Key: KAFKA-7061
> URL: https://issues.apache.org/jira/browse/KAFKA-7061
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.5.0
>Reporter: Luis Cabral
>Assignee: Senthilnathan Muthusamy
>Priority: Major
>  Labels: kip
>
> Enhance log compaction to support more than just offset comparison, so the 
> insertion order isn't dictating which records to keep.
> Default behavior is kept as it was, with the enhanced approached having to be 
> purposely activated.
>  The enhanced compaction is done either via the record timestamp, by settings 
> the new configuration as "timestamp" or via the record headers by setting 
> this configuration to anything other than the default "offset" or the 
> reserved "timestamp".
> See 
> [KIP-280|https://cwiki.apache.org/confluence/display/KAFKA/KIP-280%3A+Enhanced+log+compaction]
>  for more details.
> +From Guozhang:+ We should emphasize on the WIKI that the newly introduced 
> config yields to the existing "log.cleanup.policy", i.e. if the latter's 
> value is `delete` not `compact`, then the previous config would be ignored.
> +From Jun Rao:+ With the timestamp/header strategy, the behavior of the 
> application may need to change. In particular, the application can't just 
> blindly take the record with a larger offset and assuming that it's the value 
> to keep. It needs to check the timestamp or the header now. So, it would be 
> useful to at least document this. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7061) Enhanced log compaction

2020-02-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035828#comment-17035828
 ] 

ASF GitHub Bot commented on KAFKA-7061:
---

senthilm-ms commented on pull request #7528: KAFKA-7061: KIP-280 Enhanced log 
compaction
URL: https://github.com/apache/kafka/pull/7528
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enhanced log compaction
> ---
>
> Key: KAFKA-7061
> URL: https://issues.apache.org/jira/browse/KAFKA-7061
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.5.0
>Reporter: Luis Cabral
>Assignee: Senthilnathan Muthusamy
>Priority: Major
>  Labels: kip
>
> Enhance log compaction to support more than just offset comparison, so the 
> insertion order isn't dictating which records to keep.
> Default behavior is kept as it was, with the enhanced approached having to be 
> purposely activated.
>  The enhanced compaction is done either via the record timestamp, by settings 
> the new configuration as "timestamp" or via the record headers by setting 
> this configuration to anything other than the default "offset" or the 
> reserved "timestamp".
> See 
> [KIP-280|https://cwiki.apache.org/confluence/display/KAFKA/KIP-280%3A+Enhanced+log+compaction]
>  for more details.
> +From Guozhang:+ We should emphasize on the WIKI that the newly introduced 
> config yields to the existing "log.cleanup.policy", i.e. if the latter's 
> value is `delete` not `compact`, then the previous config would be ignored.
> +From Jun Rao:+ With the timestamp/header strategy, the behavior of the 
> application may need to change. In particular, the application can't just 
> blindly take the record with a larger offset and assuming that it's the value 
> to keep. It needs to check the timestamp or the header now. So, it would be 
> useful to at least document this. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2020-02-12 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9545:
--

 Summary: Flaky Test 
`RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
 Key: KAFKA-9545
 URL: https://issues.apache.org/jira/browse/KAFKA-9545
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Jason Gustafson


https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/

{code}
java.lang.AssertionError: Condition not met within timeout 15000. Stream tasks 
not updated
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
at 
org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9475) Replace transaction abortion scheduler with a delayed queue

2020-02-12 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9475:
---
Labels:   (was: help-wanted)

> Replace transaction abortion scheduler with a delayed queue
> ---
>
> Key: KAFKA-9475
> URL: https://issues.apache.org/jira/browse/KAFKA-9475
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> Although we could try setting the txn timeout to be 10 second, the purging 
> scheduler only works every one minute interval, so in the worst case we shall 
> still wait for 1 minute. We are considering several potential fixes:
>  # Change interval to 10 seconds: means we will have 6X frequent checking, 
> more read contention on txn metadata. The benefit here is an easy one-line 
> fix without correctness concern
>  # Use an existing delayed queue, a.k.a purgatory. From what I heard, the 
> purgatory needs at least 2 extra threads to work properly, with some add-on 
> overhead for memory and complexity. The benefit here is more precise timeout 
> reaction, without a redundant full metadata read lock.
>  # Create a new delayed queue. This could be done by using scala delayed 
> queue, the concern here is that whether this approach is production ready. 
> Benefits are the same as 2, with less code complexity potentially
> This ticket is to track #2 progress if we decide to go through this path 
> eventually.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9204) ReplaceField transformation fails when encountering tombstone event

2020-02-12 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9204.
--
Fix Version/s: 2.4.1
   2.3.2
   2.5.0
   2.2.3
 Reviewer: Randall Hauch
   Resolution: Fixed

Merged to the `trunk`, `2.5`, `2.4`, `2.3`, and `2.2` branches.

> ReplaceField transformation fails when encountering tombstone event
> ---
>
> Key: KAFKA-9204
> URL: https://issues.apache.org/jira/browse/KAFKA-9204
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Georgios Kalogiros
>Priority: Major
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> When applying the {{ReplaceField}} transformation to a tombstone event, an 
> exception is raised:
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>   at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:506)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.connect.errors.DataException: Only Map objects 
> supported in absence of schema for [field replacement], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.ReplaceField.applySchemaless(ReplaceField.java:134)
>   at 
> org.apache.kafka.connect.transforms.ReplaceField.apply(ReplaceField.java:127)
>   at 
> org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>   ... 14 more
> {code}
> There was a similar bug for the InsertField transformation that got merged in 
> recently:
> https://issues.apache.org/jira/browse/KAFKA-8523
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9475) Replace transaction abortion scheduler with a delayed queue

2020-02-12 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9475:
---
Labels: help-wanted  (was: )

> Replace transaction abortion scheduler with a delayed queue
> ---
>
> Key: KAFKA-9475
> URL: https://issues.apache.org/jira/browse/KAFKA-9475
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: help-wanted
>
> Although we could try setting the txn timeout to be 10 second, the purging 
> scheduler only works every one minute interval, so in the worst case we shall 
> still wait for 1 minute. We are considering several potential fixes:
>  # Change interval to 10 seconds: means we will have 6X frequent checking, 
> more read contention on txn metadata. The benefit here is an easy one-line 
> fix without correctness concern
>  # Use an existing delayed queue, a.k.a purgatory. From what I heard, the 
> purgatory needs at least 2 extra threads to work properly, with some add-on 
> overhead for memory and complexity. The benefit here is more precise timeout 
> reaction, without a redundant full metadata read lock.
>  # Create a new delayed queue. This could be done by using scala delayed 
> queue, the concern here is that whether this approach is production ready. 
> Benefits are the same as 2, with less code complexity potentially
> This ticket is to track #2 progress if we decide to go through this path 
> eventually.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9475) Replace transaction abortion scheduler with a delayed queue

2020-02-12 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-9475:
--

Assignee: (was: Boyang Chen)

> Replace transaction abortion scheduler with a delayed queue
> ---
>
> Key: KAFKA-9475
> URL: https://issues.apache.org/jira/browse/KAFKA-9475
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted
>
> Although we could try setting the txn timeout to be 10 second, the purging 
> scheduler only works every one minute interval, so in the worst case we shall 
> still wait for 1 minute. We are considering several potential fixes:
>  # Change interval to 10 seconds: means we will have 6X frequent checking, 
> more read contention on txn metadata. The benefit here is an easy one-line 
> fix without correctness concern
>  # Use an existing delayed queue, a.k.a purgatory. From what I heard, the 
> purgatory needs at least 2 extra threads to work properly, with some add-on 
> overhead for memory and complexity. The benefit here is more precise timeout 
> reaction, without a redundant full metadata read lock.
>  # Create a new delayed queue. This could be done by using scala delayed 
> queue, the concern here is that whether this approach is production ready. 
> Benefits are the same as 2, with less code complexity potentially
> This ticket is to track #2 progress if we decide to go through this path 
> eventually.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9544) Flaky Test `KafkaAdminClientTest.testDefaultApiTimeoutOverride`

2020-02-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035767#comment-17035767
 ] 

ASF GitHub Bot commented on KAFKA-9544:
---

hachikuji commented on pull request #8101: KAFKA-9544; Fix flaky test 
`AdminClientTest.testDefaultApiTimeoutOverride`
URL: https://github.com/apache/kafka/pull/8101
 
 
   There is a race condition with the backoff sleep in the test case and 
setting the next allowed send time in the AdminClient. To fix it, we allow the 
test case to do the backoff sleep multiple times if needed.
   
   This was fairly easy to reproduce prior to this fix. With the fix, I could 
not reproduce the problem after 500 runs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test `KafkaAdminClientTest.testDefaultApiTimeoutOverride`
> ---
>
> Key: KAFKA-9544
> URL: https://issues.apache.org/jira/browse/KAFKA-9544
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> {code}
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Thread.join(Thread.java:1260)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient.close(KafkaAdminClient.java:594)
>   at org.apache.kafka.clients.admin.Admin.close(Admin.java:98)
>   at org.apache.kafka.clients.admin.Admin.close(Admin.java:81)
>   at 
> org.apache.kafka.clients.admin.AdminClientUnitTestEnv.close(AdminClientUnitTestEnv.java:116)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClientTest.testApiTimeout(KafkaAdminClientTest.java:2642)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClientTest.testDefaultApiTimeoutOverride(KafkaAdminClientTest.java:2595)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9544) Flaky Test `KafkaAdminClientTest.testDefaultApiTimeoutOverride`

2020-02-12 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9544:
--

 Summary: Flaky Test 
`KafkaAdminClientTest.testDefaultApiTimeoutOverride`
 Key: KAFKA-9544
 URL: https://issues.apache.org/jira/browse/KAFKA-9544
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


{code}
org.junit.runners.model.TestTimedOutException: test timed out after 12 
milliseconds
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1260)
at 
org.apache.kafka.clients.admin.KafkaAdminClient.close(KafkaAdminClient.java:594)
at org.apache.kafka.clients.admin.Admin.close(Admin.java:98)
at org.apache.kafka.clients.admin.Admin.close(Admin.java:81)
at 
org.apache.kafka.clients.admin.AdminClientUnitTestEnv.close(AdminClientUnitTestEnv.java:116)
at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testApiTimeout(KafkaAdminClientTest.java:2642)
at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testDefaultApiTimeoutOverride(KafkaAdminClientTest.java:2595)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9204) ReplaceField transformation fails when encountering tombstone event

2020-02-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035764#comment-17035764
 ] 

ASF GitHub Bot commented on KAFKA-9204:
---

rhauch commented on pull request #7731: KAFKA-9204: allow ReplaceField SMT to 
handle tombstone records
URL: https://github.com/apache/kafka/pull/7731
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReplaceField transformation fails when encountering tombstone event
> ---
>
> Key: KAFKA-9204
> URL: https://issues.apache.org/jira/browse/KAFKA-9204
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Georgios Kalogiros
>Priority: Major
>
> When applying the {{ReplaceField}} transformation to a tombstone event, an 
> exception is raised:
>  
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>   at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:506)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.connect.errors.DataException: Only Map objects 
> supported in absence of schema for [field replacement], found: null
>   at 
> org.apache.kafka.connect.transforms.util.Requirements.requireMap(Requirements.java:38)
>   at 
> org.apache.kafka.connect.transforms.ReplaceField.applySchemaless(ReplaceField.java:134)
>   at 
> org.apache.kafka.connect.transforms.ReplaceField.apply(ReplaceField.java:127)
>   at 
> org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>   at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>   ... 14 more
> {code}
> There was a similar bug for the InsertField transformation that got merged in 
> recently:
> https://issues.apache.org/jira/browse/KAFKA-8523
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9192) NullPointerException if field in schema not present in value

2020-02-12 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9192.
--
Fix Version/s: 2.4.1
   2.3.2
   2.5.0
   2.2.3
 Reviewer: Randall Hauch
   Resolution: Fixed

Merged to the `trunk`, `2.5`, `2.4`, `2.3`, and `2.2` branches.

> NullPointerException if field in schema not present in value
> 
>
> Key: KAFKA-9192
> URL: https://issues.apache.org/jira/browse/KAFKA-9192
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.1
>Reporter: Mark Tinsley
>Priority: Major
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> Given a message:
> {code:java}
> {
>"schema":{
>   "type":"struct",
>   "fields":[
>  {
> "type":"string",
> "optional":true,
> "field":"abc"
>  }
>   ],
>   "optional":false,
>   "name":"foobar"
>},
>"payload":{
>}
> }
> {code}
> I would expect, given the field is optional, for the JsonConverter to still 
> process this value. 
> What happens is I get a null pointer exception, the stacktrace points to this 
> line: 
> https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L701
>  called by 
> https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L181
> Issue seems to be that we need to check and see if the jsonValue is null 
> before checking if the jsonValue has a null value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9417) Integration test for new EOS model with vanilla Producer and Consumer

2020-02-12 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9417.
--
Resolution: Fixed

> Integration test for new EOS model with vanilla Producer and Consumer
> -
>
> Key: KAFKA-9417
> URL: https://issues.apache.org/jira/browse/KAFKA-9417
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> We would like to extend the `TransactionMessageCopier` to use the new 
> subscription mode consumer and do a system test based off that in order to 
> verify the new semantic actually works.
> We also want to make sure the backward compatibility is maintained by using 
> group metadata API in existing tests as well.
>  
> A minor public change is also included within this PR by setting 
> `transaction.abort.timed.out.transaction.cleanup.interval.ms` default to 
> 1 ms (10 seconds) on broker side 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9417) Integration test for new EOS model with vanilla Producer and Consumer

2020-02-12 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-9417:
-
Fix Version/s: 2.5.0

> Integration test for new EOS model with vanilla Producer and Consumer
> -
>
> Key: KAFKA-9417
> URL: https://issues.apache.org/jira/browse/KAFKA-9417
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.5.0
>
>
> We would like to extend the `TransactionMessageCopier` to use the new 
> subscription mode consumer and do a system test based off that in order to 
> verify the new semantic actually works.
> We also want to make sure the backward compatibility is maintained by using 
> group metadata API in existing tests as well.
>  
> A minor public change is also included within this PR by setting 
> `transaction.abort.timed.out.transaction.cleanup.interval.ms` default to 
> 1 ms (10 seconds) on broker side 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9192) NullPointerException if field in schema not present in value

2020-02-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035732#comment-17035732
 ] 

ASF GitHub Bot commented on KAFKA-9192:
---

rhauch commented on pull request #7733: KAFKA-9192: fix NPE when for converting 
optional json schema in structs
URL: https://github.com/apache/kafka/pull/7733
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException if field in schema not present in value
> 
>
> Key: KAFKA-9192
> URL: https://issues.apache.org/jira/browse/KAFKA-9192
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.1
>Reporter: Mark Tinsley
>Priority: Major
>
> Given a message:
> {code:java}
> {
>"schema":{
>   "type":"struct",
>   "fields":[
>  {
> "type":"string",
> "optional":true,
> "field":"abc"
>  }
>   ],
>   "optional":false,
>   "name":"foobar"
>},
>"payload":{
>}
> }
> {code}
> I would expect, given the field is optional, for the JsonConverter to still 
> process this value. 
> What happens is I get a null pointer exception, the stacktrace points to this 
> line: 
> https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L701
>  called by 
> https://github.com/apache/kafka/blob/2.1/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L181
> Issue seems to be that we need to check and see if the jsonValue is null 
> before checking if the jsonValue has a null value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9521) Flaky Test DeleteConsumerGroupsTest#testDeleteCmdEmptyGroup

2020-02-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035693#comment-17035693
 ] 

Matthias J. Sax commented on KAFKA-9521:


Different test method but same error message. I don't create a new ticket 
because we have already many for this test for different method and they all 
seems to be related.

[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/659/testReport/junit/kafka.admin/DeleteConsumerGroupsTest/testDeleteWithMixOfSuccessAndError/]

> Flaky Test DeleteConsumerGroupsTest#testDeleteCmdEmptyGroup
> ---
>
> Key: KAFKA-9521
> URL: https://issues.apache.org/jira/browse/KAFKA-9521
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> Failed for me locally. Lost the actual stack trace/output but the error was 
> "java.lang.AssertionError: The group did not become empty as expected"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9521) Flaky Test DeleteConsumerGroupsTest#testDeleteCmdEmptyGroup

2020-02-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9521:
---
Component/s: unit tests

> Flaky Test DeleteConsumerGroupsTest#testDeleteCmdEmptyGroup
> ---
>
> Key: KAFKA-9521
> URL: https://issues.apache.org/jira/browse/KAFKA-9521
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> Failed for me locally. Lost the actual stack trace/output but the error was 
> "java.lang.AssertionError: The group did not become empty as expected"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9530) Flaky Test kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout

2020-02-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035692#comment-17035692
 ] 

Matthias J. Sax commented on KAFKA-9530:


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/659/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupWithShortInitializationTimeout/]

> Flaky Test 
> kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout
> 
>
> Key: KAFKA-9530
> URL: https://issues.apache.org/jira/browse/KAFKA-9530
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: flaky-test, test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4570/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupWithShortInitializationTimeout/]
>  
> {noformat}
> Error Messagejava.lang.AssertionError: assertion 
> failedStacktracejava.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:267)
>   at 
> kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout(DescribeConsumerGroupTest.scala:585)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at jdk.internal.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> 

[jira] [Commented] (KAFKA-9538) Flaky Test `testResetOffsetsExportImportPlan`

2020-02-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035691#comment-17035691
 ] 

Matthias J. Sax commented on KAFKA-9538:


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/659/testReport/junit/kafka.admin/ResetConsumerGroupOffsetTest/testResetOffsetsAllTopicsAllGroups/]

> Flaky Test `testResetOffsetsExportImportPlan`
> -
>
> Key: KAFKA-9538
> URL: https://issues.apache.org/jira/browse/KAFKA-9538
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
>
> {code}
> 19:44:41 
> 19:44:41 kafka.admin.ResetConsumerGroupOffsetTest > 
> testResetOffsetsExportImportPlan FAILED
> 19:44:41 java.lang.AssertionError: expected: 2, bar2-1 -> 
> 2)> but was:
> 19:44:41 at org.junit.Assert.fail(Assert.java:89)
> 19:44:41 at org.junit.Assert.failNotEquals(Assert.java:835)
> 19:44:41 at org.junit.Assert.assertEquals(Assert.java:120)
> 19:44:41 at org.junit.Assert.assertEquals(Assert.java:146)
> 19:44:41 at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExportImportPlan(ResetConsumerGroupOffsetTest.scala:429)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9541) Flaky Test DescribeConsumerGroupTest#testDescribeGroupMembersWithShortInitializationTimeout

2020-02-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035690#comment-17035690
 ] 

Matthias J. Sax commented on KAFKA-9541:


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/659/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupMembersWithShortInitializationTimeout/]

> Flaky Test 
> DescribeConsumerGroupTest#testDescribeGroupMembersWithShortInitializationTimeout
> ---
>
> Key: KAFKA-9541
> URL: https://issues.apache.org/jira/browse/KAFKA-9541
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.4.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Major
>
> h3. Error Message
> java.lang.AssertionError: assertion failed
> h3. Stacktrace
> java.lang.AssertionError: assertion failed at 
> scala.Predef$.assert(Predef.scala:267) at 
> kafka.admin.DescribeConsumerGroupTest.testDescribeGroupMembersWithShortInitializationTimeout(DescribeConsumerGroupTest.scala:630)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:413) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>  at jdk.internal.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>  at jdk.internal.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> 

[jira] [Commented] (KAFKA-8110) Flaky Test DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions

2020-02-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035689#comment-17035689
 ] 

Matthias J. Sax commented on KAFKA-8110:


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/659/testReport/junit/kafka.admin/DeleteConsumerGroupsTest/testDeleteCmdWithMixOfSuccessAndError/]

> Flaky Test 
> DescribeConsumerGroupTest#testDescribeMembersWithConsumersWithoutAssignedPartitions
> --
>
> Key: KAFKA-8110
> URL: https://issues.apache.org/jira/browse/KAFKA-8110
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.5.0
>
>
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/67/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeMembersWithConsumersWithoutAssignedPartitions/]
> {quote}java.lang.AssertionError: Partition [__consumer_offsets,0] metadata 
> not propagated after 15000 ms at 
> kafka.utils.TestUtils$.fail(TestUtils.scala:381) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:791) at 
> kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:880) at 
> kafka.utils.TestUtils$.$anonfun$createTopic$3(TestUtils.scala:318) at 
> kafka.utils.TestUtils$.$anonfun$createTopic$3$adapted(TestUtils.scala:317) at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at 
> scala.collection.immutable.Range.foreach(Range.scala:158) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:237) at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:230) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
> kafka.utils.TestUtils$.createTopic(TestUtils.scala:317) at 
> kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:375) at 
> kafka.admin.DescribeConsumerGroupTest.testDescribeMembersWithConsumersWithoutAssignedPartitions(DescribeConsumerGroupTest.scala:372){quote}
> STDOUT
> {quote}[2019-03-14 20:01:52,347] WARN Ignoring unexpected runtime exception 
> (org.apache.zookeeper.server.NIOServerCnxnFactory:236) 
> java.nio.channels.CancelledKeyException at 
> sun.nio.ch.SelectionKeyImpl.ensureValid(SelectionKeyImpl.java:73) at 
> sun.nio.ch.SelectionKeyImpl.readyOps(SelectionKeyImpl.java:87) at 
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:205)
>  at java.lang.Thread.run(Thread.java:748) TOPIC PARTITION CURRENT-OFFSET 
> LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID foo 0 0 0 0 - - - TOPIC 
> PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID foo 0 
> 0 0 0 - - - COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS 
> localhost:44669 (0){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9417) Integration test for new EOS model with vanilla Producer and Consumer

2020-02-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035688#comment-17035688
 ] 

ASF GitHub Bot commented on KAFKA-9417:
---

guozhangwang commented on pull request #8000: KAFKA-9417: New Integration Test 
for KIP-447
URL: https://github.com/apache/kafka/pull/8000
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Integration test for new EOS model with vanilla Producer and Consumer
> -
>
> Key: KAFKA-9417
> URL: https://issues.apache.org/jira/browse/KAFKA-9417
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> We would like to extend the `TransactionMessageCopier` to use the new 
> subscription mode consumer and do a system test based off that in order to 
> verify the new semantic actually works.
> We also want to make sure the backward compatibility is maintained by using 
> group metadata API in existing tests as well.
>  
> A minor public change is also included within this PR by setting 
> `transaction.abort.timed.out.transaction.cleanup.interval.ms` default to 
> 1 ms (10 seconds) on broker side 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9530) Flaky Test kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout

2020-02-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035684#comment-17035684
 ] 

Matthias J. Sax commented on KAFKA-9530:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4672/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupWithShortInitializationTimeout/]

> Flaky Test 
> kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout
> 
>
> Key: KAFKA-9530
> URL: https://issues.apache.org/jira/browse/KAFKA-9530
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: flaky-test, test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4570/testReport/junit/kafka.admin/DescribeConsumerGroupTest/testDescribeGroupWithShortInitializationTimeout/]
>  
> {noformat}
> Error Messagejava.lang.AssertionError: assertion 
> failedStacktracejava.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:267)
>   at 
> kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout(DescribeConsumerGroupTest.scala:585)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at jdk.internal.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> 

[jira] [Resolved] (KAFKA-6607) Kafka Streams lag not zero when input topic transactional

2020-02-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6607.

Fix Version/s: 2.5.0
   Resolution: Fixed

> Kafka Streams lag not zero when input topic transactional
> -
>
> Key: KAFKA-6607
> URL: https://issues.apache.org/jira/browse/KAFKA-6607
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 2.5.0
>
>
> When an input topic for a Kafka Streams application is written using 
> transaction, Kafka Streams commits an "incorrect" offset, ie, it commits 
> "lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it 
> reaches the end of topic. The reason is the commit marker that is the last 
> "message" in the topic; Streams does not take commit markers into account 
> when committing.
> This is not a correctness issue, but when one inspect the consumer lag via 
> {{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is 
> correct from consumer-group tool point of view.
> Note that all applications using a plain consumer may face the same issue if 
> they use `KafkaConsumer#commitSync(Map 
> offsets)`: to address the issue, the correct pattern is to either commit 
> "nextRecord.offset()" (if the next record is available already, ie, was 
> returned by `poll()`, or use `consumer.position()` that takes the commit 
> marker into account and would "step over it").



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6607) Kafka Streams lag not zero when input topic transactional

2020-02-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6607:
---
Description: 
When an input topic for a Kafka Streams application is written using 
transaction, Kafka Streams commits an "incorrect" offset, ie, it commits 
"lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it reaches 
the end of topic. The reason is the commit marker that is the last "message" in 
the topic; Streams does not take commit markers into account when committing.

This is not a correctness issue, but when one inspect the consumer lag via 
{{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is 
correct from consumer-group tool point of view.

Note that all applications using a plain consumer may face the same issue if 
they use `KafkaConsumer#commitSync(Map 
offsets)`: to address the issue, the correct pattern is to either commit 
"nextRecord.offset()" (if the next record is available already, ie, was 
returned by `poll()`, or use `consumer.position()` that takes the commit marker 
into account and would "step over it").

  was:
When an input topic for a Kafka Streams application is written using 
transaction, Kafka Streams commits an "incorrect" offset, ie, it commits 
"lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it reaches 
the end of topic. The reason is the commit marker that is the last "message" in 
the topic; Streams does not take commit markers into account when committing.

This is not a correctness issue, but when one inspect the consumer lag via 
{{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is 
correct from consumer-group tool point of view.


> Kafka Streams lag not zero when input topic transactional
> -
>
> Key: KAFKA-6607
> URL: https://issues.apache.org/jira/browse/KAFKA-6607
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> When an input topic for a Kafka Streams application is written using 
> transaction, Kafka Streams commits an "incorrect" offset, ie, it commits 
> "lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it 
> reaches the end of topic. The reason is the commit marker that is the last 
> "message" in the topic; Streams does not take commit markers into account 
> when committing.
> This is not a correctness issue, but when one inspect the consumer lag via 
> {{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is 
> correct from consumer-group tool point of view.
> Note that all applications using a plain consumer may face the same issue if 
> they use `KafkaConsumer#commitSync(Map 
> offsets)`: to address the issue, the correct pattern is to either commit 
> "nextRecord.offset()" (if the next record is available already, ie, was 
> returned by `poll()`, or use `consumer.position()` that takes the commit 
> marker into account and would "step over it").



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6607) Kafka Streams lag not zero when input topic transactional

2020-02-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6607:
---
Description: 
When an input topic for a Kafka Streams application is written using 
transaction, Kafka Streams commits an "incorrect" offset, ie, it commits 
"lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it reaches 
the end of topic. The reason is the commit marker that is the last "message" in 
the topic; Streams does not take commit markers into account when committing.

This is not a correctness issue, but when one inspect the consumer lag via 
{{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is 
correct from consumer-group tool point of view.

  was:
When an input topic for a Consumer or Kafka Streams application is written 
using transaction, the client does not commit "endOffset" but "endOffset - 1" 
(or to be more precise, "lastProcessedMessageOffset + 1") if it reaches the end 
of topic. The reason is the commit marker that is the last "message" in the 
topic; Streams commit "offset of last processed message plus 1" and does not 
take commit markers into account.

This is not a correctness issue, but when one inspect the consumer lag via 
{{bin/kafka-consumer.group.sh}} the lag is show as 1 instead of 0 – what is 
correct from consumer-group tool point of view.


> Kafka Streams lag not zero when input topic transactional
> -
>
> Key: KAFKA-6607
> URL: https://issues.apache.org/jira/browse/KAFKA-6607
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> When an input topic for a Kafka Streams application is written using 
> transaction, Kafka Streams commits an "incorrect" offset, ie, it commits 
> "lastProcessedMessageOffset + 1" that is smaller than "endOffset" if it 
> reaches the end of topic. The reason is the commit marker that is the last 
> "message" in the topic; Streams does not take commit markers into account 
> when committing.
> This is not a correctness issue, but when one inspect the consumer lag via 
> {{bin/kafka-consumer.group.sh}} the lag is shown as 1 instead of 0 – what is 
> correct from consumer-group tool point of view.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6607) Kafka Streams lag not zero when input topic transactional

2020-02-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6607:
---
Component/s: (was: clients)

> Kafka Streams lag not zero when input topic transactional
> -
>
> Key: KAFKA-6607
> URL: https://issues.apache.org/jira/browse/KAFKA-6607
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> When an input topic for a Consumer or Kafka Streams application is written 
> using transaction, the client does not commit "endOffset" but "endOffset - 1" 
> (or to be more precise, "lastProcessedMessageOffset + 1") if it reaches the 
> end of topic. The reason is the commit marker that is the last "message" in 
> the topic; Streams commit "offset of last processed message plus 1" and does 
> not take commit markers into account.
> This is not a correctness issue, but when one inspect the consumer lag via 
> {{bin/kafka-consumer.group.sh}} the lag is show as 1 instead of 0 – what is 
> correct from consumer-group tool point of view.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6607) Kafka Streams lag not zero when input topic transactional

2020-02-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6607:
---
Summary: Kafka Streams lag not zero when input topic transactional  (was: 
Consumer Client and Kafka Streams lag not zero when input topic transactional)

> Kafka Streams lag not zero when input topic transactional
> -
>
> Key: KAFKA-6607
> URL: https://issues.apache.org/jira/browse/KAFKA-6607
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> When an input topic for a Consumer or Kafka Streams application is written 
> using transaction, the client does not commit "endOffset" but "endOffset - 1" 
> (or to be more precise, "lastProcessedMessageOffset + 1") if it reaches the 
> end of topic. The reason is the commit marker that is the last "message" in 
> the topic; Streams commit "offset of last processed message plus 1" and does 
> not take commit markers into account.
> This is not a correctness issue, but when one inspect the consumer lag via 
> {{bin/kafka-consumer.group.sh}} the lag is show as 1 instead of 0 – what is 
> correct from consumer-group tool point of view.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6607) Consumer Client and Kafka Streams lag not zero when input topic transactional

2020-02-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035681#comment-17035681
 ] 

ASF GitHub Bot commented on KAFKA-6607:
---

mjsax commented on pull request #8091: KAFKA-6607: Commit correct offsets for 
transactional input data
URL: https://github.com/apache/kafka/pull/8091
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer Client and Kafka Streams lag not zero when input topic transactional
> -
>
> Key: KAFKA-6607
> URL: https://issues.apache.org/jira/browse/KAFKA-6607
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> When an input topic for a Consumer or Kafka Streams application is written 
> using transaction, the client does not commit "endOffset" but "endOffset - 1" 
> (or to be more precise, "lastProcessedMessageOffset + 1") if it reaches the 
> end of topic. The reason is the commit marker that is the last "message" in 
> the topic; Streams commit "offset of last processed message plus 1" and does 
> not take commit markers into account.
> This is not a correctness issue, but when one inspect the consumer lag via 
> {{bin/kafka-consumer.group.sh}} the lag is show as 1 instead of 0 – what is 
> correct from consumer-group tool point of view.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9499) Improve deletion process by batching more aggressively

2020-02-12 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9499.

Fix Version/s: 2.5.0
   Resolution: Fixed

> Improve deletion process by batching more aggressively
> --
>
> Key: KAFKA-9499
> URL: https://issues.apache.org/jira/browse/KAFKA-9499
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.5.0
>
>
> I have noticed that the topics deletion process could be improve by batching 
> topics here: 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/TopicDeletionManager.scala#L354]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9499) Improve deletion process by batching more aggressively

2020-02-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035658#comment-17035658
 ] 

ASF GitHub Bot commented on KAFKA-9499:
---

hachikuji commented on pull request #8053: KAFKA-9499; Improve deletion process 
by batching more aggressively
URL: https://github.com/apache/kafka/pull/8053
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve deletion process by batching more aggressively
> --
>
> Key: KAFKA-9499
> URL: https://issues.apache.org/jira/browse/KAFKA-9499
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> I have noticed that the topics deletion process could be improve by batching 
> topics here: 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/TopicDeletionManager.scala#L354]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8187) State store record loss across multiple reassignments when using standby tasks

2020-02-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035614#comment-17035614
 ] 

ASF GitHub Bot commented on KAFKA-8187:
---

mjsax commented on pull request #6849: KAFKA-8187 Bugfix for branch 2.0
URL: https://github.com/apache/kafka/pull/6849
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State store record loss across multiple reassignments when using standby tasks
> --
>
> Key: KAFKA-8187
> URL: https://issues.apache.org/jira/browse/KAFKA-8187
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Assignee: Lifei Chen
>Priority: Blocker
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> Overview:
> There is a race condition that can cause a partitioned state store to be 
> missing records up to an offset when using standby tasks.
> When a reassignment occurs and a task is migrated to a StandbyTask in another 
> StreamThread/TaskManager on the same JVM, there can be lock contention that 
> prevents the StandbyTask on the currently assigned StreamThread from 
> acquiring the lock and to not retry acquiring the lock because all of the 
> active StreamTasks are running for that StreamThread. If the StandbyTask does 
> not acquire the lock before the StreamThread enters into the RUNNING state, 
> then the StandbyTask will not consume any records. If there is no subsequent 
> reassignment before the second execution of the stateDirCleaner Thread, then 
> the task directory for the StandbyTask will be deleted. When the next 
> reassignment occurs the offset that was read by the StandbyTask at creation 
> time before acquiring the lock will be written back to the state store 
> directory, this re-creates the state store directory.
> An example:
> StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
> streams application.
> StreamThread(A) has StandbyTask 1_0
> StreamThread(B) has no tasks
> A reassignment is triggered by another host in the streams application fleet.
> StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads 
> one task
> StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby 
> task for 1_0
> Here begins the race condition.
> StreamThread(B) creates the StandbyTask which reads the current checkpoint 
> from disk.
> StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's 
> assigned tasks. [0]
> StreamThread(B) initializes the new tasks for the active and standby tasks. 
> [1] [2]
> StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
> with a LockException [3], since StreamThread(A) still holds the lock.
> StreamThread(B) returns true from updateNewAndRestoringTasks() due to the 
> check at [4] which only checks that the active assigned tasks are running.
> StreamThread(B) state is set to RUNNING
> StreamThread(A) closes the previous StandbyTask specifically calling 
> closeStateManager() [5]
> StreamThread(A) state is set to RUNNING
> Streams application for this host has completed re-balancing and is now in 
> the RUNNING state.
> State at this point is the following: State directory exists for 1_0 and all 
> data is present.
> Then at a period that is 1 to 2 intervals of [6](which is default of 10 
> minutes) after the reassignment had completed the stateDirCleaner thread will 
> execute [7].
> The stateDirCleaner will then do [8], which finds the directory 1_0, finds 
> that there isn't an active lock for that directory, acquire the lock, and 
> deletes the directory.
> State at this point is the following: State directory does not exist for 1_0.
> When the next reassignment occurs. The offset that was read by 
> StreamThread(B) during construction of the StandbyTask for 1_0 will be 
> written back to disk. This write re-creates the state store directory and 
> writes the .checkpoint file with the old offset.
> State at this point is the following: State directory exists for 1_0 with a 
> '.checkpoint' file in it, but there is no other state store data in the 
> directory.
> If this host is assigned the active task for 1_0 then all the history in the 
> state store will be missing from before the offset that was read at the 
> previous reassignment. 
> If this host is assigned the standby task for 1_0 then the lock will be 
> acquired and the standby will start to consume records, but it will still be 
> missing all records from before the offset that was read at the 

[jira] [Updated] (KAFKA-9535) Metadata not updated when consumer encounters FENCED_LEADER_EPOCH

2020-02-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9535:
---
Affects Version/s: (was: 2.5.0)
   (was: 2.4.0)

> Metadata not updated when consumer encounters FENCED_LEADER_EPOCH
> -
>
> Key: KAFKA-9535
> URL: https://issues.apache.org/jira/browse/KAFKA-9535
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Inside the consumer Fetcher's handling of ListOffsetResponse, if we hit 
> `FENCED_LEADER_EPOCH` on partition level, the client will blindly retry 
> without refreshing the metadata, creating a stuck state as the local leader 
> epoch never gets updated and constantly fails the broker check.
> The solution is to trigger metadata update upon receiving retriable errors, 
> before we kick off another offset list.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8073) Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer

2020-02-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035598#comment-17035598
 ] 

ASF GitHub Bot commented on KAFKA-8073:
---

chia7712 commented on pull request #8098: KAFKA-8073 Transient failure in 
kafka.api.UserQuotaTest.testThrottled…
URL: https://github.com/apache/kafka/pull/8098
 
 
   the input data in testThrottledProducerConsumer are 1000 messages and each 
message carries only a string bytes which is converted from int value. The 
quota is 8000 bytes/second so it fails to start the throttle if the machine is 
too slow to complete all request. It is a timer-based test case. The workaround 
is to increase the size of input data.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer
> --
>
> Key: KAFKA-8073
> URL: https://issues.apache.org/jira/browse/KAFKA-8073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Bill Bejeck
>Assignee: Chia-Ping Tsai
>Priority: Critical
> Fix For: 2.2.3, 2.5.0
>
>
> Failed in build [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20134/]
>  
> Stacktrace and STDOUT
> {noformat}
> Error Message
> java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have 
> been throttled
> Stacktrace
> java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have 
> been throttled
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> kafka.api.QuotaTestClients.verifyThrottleTimeMetric(BaseQuotaTest.scala:229)
>   at 
> kafka.api.QuotaTestClients.verifyProduceThrottle(BaseQuotaTest.scala:215)
>   at 
> kafka.api.BaseQuotaTest.testThrottledProducerConsumer(BaseQuotaTest.scala:82)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> 

[jira] [Commented] (KAFKA-9542) ZSTD Compression Not Working

2020-02-12 Thread Lucas Bradstreet (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035594#comment-17035594
 ] 

Lucas Bradstreet commented on KAFKA-9542:
-

Hi Prashant,

Can you please check whether /tmp is mounted noexec? The library gets extracted 
to tmp and then loads it, so if it's noexec it's problematic. If it is, you can 
confirm that this is the problem by adding KAFKA_OPTS to supply 
-Djava.io.tmpdir=/path/to/other/dir and see whether it works with an 
alternative tmpdir.

> ZSTD Compression Not Working
> 
>
> Key: KAFKA-9542
> URL: https://issues.apache.org/jira/browse/KAFKA-9542
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 2.3.0
> Environment: Linux, CentOS
>Reporter: Prashant
>Priority: Critical
>
> I enabled zstd compression at producer by adding  "compression.type=zstd" in 
> producer config. When try to run it, producer fails with 
> "org.apache.kafka.common.errors.UnknownServerException: The server 
> experienced an unexpected error when processing the request"
> In Broker Logs, I could find following exception:
>  
> [2020-02-12 11:48:04,623] ERROR [ReplicaManager broker=1] Error processing 
> append operation on partition load_logPlPts-6 (kafka.server.ReplicaManager)
> org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could 
> not initialize class 
> org.apache.kafka.common.record.CompressionType$ZstdConstructors
>        at 
> org.apache.kafka.common.record.CompressionType$5.wrapForInput(CompressionType.java:133)
>        at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257)
>        at 
> org.apache.kafka.common.record.DefaultRecordBatch.iterator(DefaultRecordBatch.java:324)
>        at 
> scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:54)
>        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>        at 
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:269)
>        at 
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:261)
>        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>        at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:261)
>        at 
> kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:72)
>        at kafka.log.Log$$anonfun$append$2.liftedTree1$1(Log.scala:869)
>        at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868)
>        at kafka.log.Log$$anonfun$append$2.apply(Log.scala:850)
>        at kafka.log.Log.maybeHandleIOException(Log.scala:2065)
>        at kafka.log.Log.append(Log.scala:850)
>        at kafka.log.Log.appendAsLeader(Log.scala:819)
>        at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:771)
>        at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:759)
>        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) 
>  
> This is fresh broker installed on "CentOS Linux" v7. This doesn't seem to be 
> a classpath issue as same package is working on MacOS. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8073) Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer

2020-02-12 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-8073:
-

Assignee: Chia-Ping Tsai

> Transient failure in kafka.api.UserQuotaTest.testThrottledProducerConsumer
> --
>
> Key: KAFKA-8073
> URL: https://issues.apache.org/jira/browse/KAFKA-8073
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Bill Bejeck
>Assignee: Chia-Ping Tsai
>Priority: Critical
> Fix For: 2.2.3, 2.5.0
>
>
> Failed in build [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20134/]
>  
> Stacktrace and STDOUT
> {noformat}
> Error Message
> java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have 
> been throttled
> Stacktrace
> java.lang.AssertionError: Client with id=QuotasTestProducer-1 should have 
> been throttled
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> kafka.api.QuotaTestClients.verifyThrottleTimeMetric(BaseQuotaTest.scala:229)
>   at 
> kafka.api.QuotaTestClients.verifyProduceThrottle(BaseQuotaTest.scala:215)
>   at 
> kafka.api.BaseQuotaTest.testThrottledProducerConsumer(BaseQuotaTest.scala:82)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> 

[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2020-02-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1703#comment-1703
 ] 

Matthias J. Sax commented on KAFKA-7965:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4665/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/]

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 1.1.1, 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8037) KTable restore may load bad data

2020-02-12 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035541#comment-17035541
 ] 

Sophie Blee-Goldman commented on KAFKA-8037:


Ha, yes good catch I'll fix the typo in 2)

That's a good point about later valid records potentially overwriting a 
corrupted one; I suppose we can't just load the two changelogs one after 
another, and would have to restore from both at the same time by choosing the 
next record in the original order. Might be tricky to synchronize but not 
impossible.

I think this should still work for key deserialization errors though; we would 
still copy the plain key bytes with a null value on restore, which is treated 
as a delete by the store (without ever having to deserialize the corrupted key)

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8037) KTable restore may load bad data

2020-02-12 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17034746#comment-17034746
 ] 

Sophie Blee-Goldman edited comment on KAFKA-8037 at 2/12/20 5:38 PM:
-

[~pkleindl] [~vvcephei] [~mjsax] [~guozhang] I have kind of a different 
approach that might be a good compromise. We want to satisfy all of the 
following three goals as much as possible:
 # reuse the input topic as the changelog to avoid replicating all that data
 # make restoration as fast as possible by copying plain bytes with_out_ 
deserializing
 # don't load bad data, or copy bytes during restoration that weren't copied 
during normal processing

As this ticket points out, we currently sacrifice 3) for the sake of 1) and 2) 
with the workaround being to give up 1) by turning off optimization. But what 
if we instead created a new type of internal topic that's essentially an 
"inverse-changelog" – rather than sending every record that goes to the store 
to this inverse-changelog, we send only the records that _don't_ get put into 
the store? In fact we don't even store the entire record, just the key bytes 
with a null value. Then once we've restored from the input-topic-changelog, we 
read from the inverse-changelog and any bad records get deleted without ever 
having to deserialize or store the value twice.

My one concern is over the tracking/handling of local stream time: if we load 
the bad data during the initial restoration, it might bump up the stream time 
when it shouldn't have and potentially cause older, valid records to get 
dropped. If that's a real concern, then this approach would be blocked on 
KAFKA-9368 – however, I'm not confident that corrupted records don't currently 
bump the stream-time even during normal processing, and also not sure what kind 
of guarantees we should or do make w.r.t deserialization exceptions. 

Thoughts?


was (Author: ableegoldman):
[~pkleindl] [~vvcephei] [~mjsax] [~guozhang] I have kind of a different 
approach that might be a good compromise. We want to satisfy all of the 
following three goals as much as possible:
 # reuse the input topic as the changelog to avoid replicating all that data
 # make restoration as fast as possible by copying plain bytes with 
deserializing
 # don't load bad data, or copy bytes during restoration that weren't copied 
during normal processing

As this ticket points out, we currently sacrifice 3) for the sake of 1) and 2) 
with the workaround being to give up 1) by turning off optimization. But what 
if we instead created a new type of internal topic that's essentially an 
"inverse-changelog" – rather than sending every record that goes to the store 
to this inverse-changelog, we send only the records that _don't_ get put into 
the store? In fact we don't even store the entire record, just the key bytes 
with a null value. Then once we've restored from the input-topic-changelog, we 
read from the inverse-changelog and any bad records get deleted without ever 
having to deserialize or store the value twice.

My one concern is over the tracking/handling of local stream time: if we load 
the bad data during the initial restoration, it might bump up the stream time 
when it shouldn't have and potentially cause older, valid records to get 
dropped. If that's a real concern, then this approach would be blocked on 
KAFKA-9368 – however, I'm not confident that corrupted records don't currently 
bump the stream-time even during normal processing, and also not sure what kind 
of guarantees we should or do make w.r.t deserialization exceptions. 

Thoughts?

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input 

[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-12 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035517#comment-17035517
 ] 

Evan Williams commented on KAFKA-4084:
--

[~sql_consulting] [~junrao]

FYI, I implemented separation of replication/controller traffic, and this 
helped somewhat. However, I think in the end the only real solution is KIP-491.

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-12 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035517#comment-17035517
 ] 

Evan Williams edited comment on KAFKA-4084 at 2/12/20 5:08 PM:
---

[~sql_consulting] [~junrao]

FYI, I implemented separation of replication/controller/client traffic, and 
this helped somewhat. However, I think in the end the only real solution is 
KIP-491.


was (Author: blodsbror):
[~sql_consulting] [~junrao]

FYI, I implemented separation of replication/controller traffic, and this 
helped somewhat. However, I think in the end the only real solution is KIP-491.

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On

2020-02-12 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035500#comment-17035500
 ] 

John Roesler commented on KAFKA-9517:
-

Hi [~psnively],

Just another quick update: now all the reported bugs for foreign-key joins are 
fixed and merged to the branches: trunk, 2.5, and 2.4 . If you want to test the 
fixes, you can now just check out and build the 2.4 branch without any 
cherry-picking.

There is still a blocker for 2.4.1, so [~bbejeck] hasn't created an actual 
release candidate yet.

Thanks,
-John

> KTable Joins Without Materialized Argument Yield Results That Further Joins 
> NPE On
> --
>
> Key: KAFKA-9517
> URL: https://issues.apache.org/jira/browse/KAFKA-9517
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Paul Snively
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.5.0, 2.4.1
>
> Attachments: test.tar.xz
>
>
> The `KTable` API implemented [[here||#L842-L844]] 
> [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844]
>  []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of 
> `Materialized.with(null, null)`, as apparently do several other APIs. As the 
> comment spanning [these lines|#L1098-L1099]] makes clear, the result is a 
> `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, 
> attempts to `join` etc. on the resulting `KTable` fail with a 
> `NullPointerException`.
> While there is an obvious workaround—explicitly construct the required 
> `Materialized` and use the APIs that take it as an argument—I have to admit I 
> find the existence of public APIs with this sort of bug, particularly when 
> the bug is literally documented as a comment in the source code, astonishing 
> to the point of incredulity. It calls the quality and trustworthiness of 
> Kafka Streams into serious question, and if a resolution is not forthcoming 
> within a week, we will be left with no other option but to consider technical 
> alternatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9543) Consumer offset reset after new segment rolling

2020-02-12 Thread Jira
Rafał Boniecki created KAFKA-9543:
-

 Summary: Consumer offset reset after new segment rolling
 Key: KAFKA-9543
 URL: https://issues.apache.org/jira/browse/KAFKA-9543
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0
Reporter: Rafał Boniecki
 Attachments: Untitled.png

After upgrade from kafka 2.1.1 to 2.4.0, I'm experiencing unexpected consumer 
offset resets.

Consumer:
{code:java}
2020-02-12T11:12:58.402+01:00 hostname 4a2a39a35a02 
[2020-02-12T11:12:58,402][INFO 
][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer 
clientId=logstash-1, groupId=logstash] Fetch offset 1632750575 is out of range 
for partition stats-5, resetting offset
{code}
Broker:
{code:java}
2020-02-12 11:12:58:400 CET INFO  
[data-plane-kafka-request-handler-1][kafka.log.Log] [Log partition=stats-5, 
dir=/kafka4/data] Rolled new log segment at offset 1632750565 in 2 ms.{code}
All resets are perfectly correlated to rolling new segments at the broker - 
segment is rolled first, then, couple of ms later, reset on the consumer 
occurs. Attached is grafana graph with consumer lag per partition. All sudden 
spikes in lag are offset resets due to this bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-12 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035407#comment-17035407
 ] 

Bruno Cadonna commented on KAFKA-9533:
--

[~mviamari] Would you be interested in fixing this bug?

> ValueTransform forwards `null` values
> -
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Viamari
>Priority: Minor
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9542) ZSTD Compression Not Working

2020-02-12 Thread Prashant (Jira)
Prashant created KAFKA-9542:
---

 Summary: ZSTD Compression Not Working
 Key: KAFKA-9542
 URL: https://issues.apache.org/jira/browse/KAFKA-9542
 Project: Kafka
  Issue Type: Bug
  Components: compression
Affects Versions: 2.3.0
 Environment: Linux, CentOS
Reporter: Prashant


I enabled zstd compression at producer by adding  "compression.type=zstd" in 
producer config. When try to run it, producer fails with 

"org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request"

In Broker Logs, I could find following exception:

 

[2020-02-12 11:48:04,623] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition load_logPlPts-6 (kafka.server.ReplicaManager)
org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could 
not initialize class 
org.apache.kafka.common.record.CompressionType$ZstdConstructors
       at 
org.apache.kafka.common.record.CompressionType$5.wrapForInput(CompressionType.java:133)
       at 
org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257)
       at 
org.apache.kafka.common.record.DefaultRecordBatch.iterator(DefaultRecordBatch.java:324)
       at 
scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:54)
       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
       at 
kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:269)
       at 
kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(LogValidator.scala:261)
       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
       at 
kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:261)
       at 
kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:72)
       at kafka.log.Log$$anonfun$append$2.liftedTree1$1(Log.scala:869)
       at kafka.log.Log$$anonfun$append$2.apply(Log.scala:868)
       at kafka.log.Log$$anonfun$append$2.apply(Log.scala:850)
       at kafka.log.Log.maybeHandleIOException(Log.scala:2065)
       at kafka.log.Log.append(Log.scala:850)
       at kafka.log.Log.appendAsLeader(Log.scala:819)
       at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:771)
       at kafka.cluster.Partition$$anonfun$14.apply(Partition.scala:759)
       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) 

 

This is fresh broker installed on "CentOS Linux" v7. This doesn't seem to be a 
classpath issue as same package is working on MacOS. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8037) KTable restore may load bad data

2020-02-12 Thread Patrik Kleindl (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035215#comment-17035215
 ] 

Patrik Kleindl commented on KAFKA-8037:
---

[~ableegoldman] Interesting proposal.
At 2. you mean "with_out_ deserializing", right?

One question for me would be how you would remove records from the 
inverse-changelog in case a good records is received after a bad one. Otherwise 
the good one would be loaded and removed afterwards. Unless you do some extra 
timestamp-checking etc.

And if I understand it correctly this would only work for problems with value 
deserialization, not if the key is affected, which is rare but possible.

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams

2020-02-12 Thread highluck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

highluck updated KAFKA-9455:

Comment: was deleted

(was: [~guozhang]

I have one more question

What do you think about splitting a WindowStore into two stores?)

> Consider using TreeMap for in-memory stores of Streams
> --
>
> Key: KAFKA-9455
> URL: https://issues.apache.org/jira/browse/KAFKA-9455
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: highluck
>Priority: Major
>  Labels: newbie++
>
> From [~ableegoldman]: It's worth noting that it might be a good idea to 
> switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap 
> allows us to safely perform range queries without copying over the entire 
> keyset, but the performance on point queries seems to scale noticeably worse 
> with the number of unique keys. Point queries are used by aggregations while 
> range queries are used by windowed joins, but of course both are available 
> within the PAPI and for interactive queries so it's hard to say which we 
> should prefer. Maybe rather than make that tradeoff we should have one 
> version for efficient range queries (a "JoinWindowStore") and one for 
> efficient point queries ("AggWindowStore") - or something. I know we've had 
> similar thoughts for a different RocksDB store layout for Joins (although I 
> can't find that ticket anywhere..), it seems like the in-memory stores could 
> benefit from a special "Join" version as well cc/ Guozhang Wang
> Here are some random thoughts:
> 1. For kafka streams processing logic (i.e. without IQ), it's better to make 
> all processing logic relying on point queries rather than range queries. 
> Right now the only processor that use range queries are, as mentioned above, 
> windowed stream-stream joins. I think we should consider using a different 
> window implementation for this (and as a result also get rid of the 
> retainDuplicate flags) to refactor the windowed stream-stream join operation.
> 2. With 1), range queries would only be exposed as IQ. Depending on its usage 
> frequency I think it makes lots of sense to optimize for single-point queries.
> Of course, even without step 1) we should still consider using tree-map for 
> windowed in-memory stores to have a better scaling effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)