[jira] [Resolved] (KAFKA-8392) Kafka broker leaks metric when partition leader moves to another node.

2019-07-19 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8392.
--
   Resolution: Fixed
Fix Version/s: 2.4.0

> Kafka broker leaks metric when partition leader moves to another node.
> --
>
> Key: KAFKA-8392
> URL: https://issues.apache.org/jira/browse/KAFKA-8392
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.2.0
>Reporter: Kamal Chandraprakash
>Assignee: Tu Tran
>Priority: Major
> Fix For: 2.4.0
>
>
> When a partition leader moves from one node to another due to an imbalance in 
> leader.imbalance.per.broker.percentage, the old leader broker still emits the 
> static metric value.
> Steps to reproduce:
> 1. Create a cluster with 3 nodes.
> 2. Create a topic with 2 partitions and RF=3
> 3. Generate some data using the console producer.
> 4. Move any one of the partition from one node to another using 
> reassign-partitions and preferred-replica-election script.
> 5. Generate some data using the console producer.
> 6. Now all the 3 nodes emit bytesIn, bytesOut and MessagesIn for that topic.
> Is it the expected behavior?



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8392) Kafka broker leaks metric when partition leader moves to another node.

2019-07-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889211#comment-16889211
 ] 

ASF GitHub Bot commented on KAFKA-8392:
---

guozhangwang commented on pull request #6977: KAFKA-8392: Fix old metrics 
leakage by brokers that have no leadership over any partition for a topic
URL: https://github.com/apache/kafka/pull/6977
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka broker leaks metric when partition leader moves to another node.
> --
>
> Key: KAFKA-8392
> URL: https://issues.apache.org/jira/browse/KAFKA-8392
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.2.0
>Reporter: Kamal Chandraprakash
>Assignee: Tu Tran
>Priority: Major
>
> When a partition leader moves from one node to another due to an imbalance in 
> leader.imbalance.per.broker.percentage, the old leader broker still emits the 
> static metric value.
> Steps to reproduce:
> 1. Create a cluster with 3 nodes.
> 2. Create a topic with 2 partitions and RF=3
> 3. Generate some data using the console producer.
> 4. Move any one of the partition from one node to another using 
> reassign-partitions and preferred-replica-election script.
> 5. Generate some data using the console producer.
> 6. Now all the 3 nodes emit bytesIn, bytesOut and MessagesIn for that topic.
> Is it the expected behavior?



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8666) Improve Documentation on usage of Materialized config object

2019-07-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889191#comment-16889191
 ] 

Matthias J. Sax commented on KAFKA-8666:


Seems some people think differently. Compare the linked Jira tickets. Feel free 
to comment on those, too.

> Improve Documentation on usage of Materialized config object
> 
>
> Key: KAFKA-8666
> URL: https://issues.apache.org/jira/browse/KAFKA-8666
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: newbie
>
> When using the Materialized object if the user wants to name the statestore 
> with
> {code:java}
> Materialized.as("MyStoreName"){code}
> then subsequently provide the key and value serde the calls to do so must 
> take the form of 
> {code:java}
> Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde)
> {code}
> If users do the following 
> {code:java}
> Materialized.as("MyStoreName").with(keySerde, valueSerde)
> {code}
> the Materialized instance created by the "as(storeName)" call is replaced by 
> a new Materialized instance resulting from the "with(...)" call and any 
> configuration on the first Materialized instance is lost.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8663) partition assignment would be better original_assignment + new_reassignment during reassignments

2019-07-19 Thread GEORGE LI (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GEORGE LI resolved KAFKA-8663.
--
Resolution: Won't Fix

looks like RAR + OAR  is required for  KIP-455 to preserve the targetReplicas 
exact ordering, and old replicas that need to be dropped is in the new 
removingReplicas.  of /brokers/topics/ zk node. 


> partition assignment would be better original_assignment + new_reassignment 
> during reassignments
> 
>
> Key: KAFKA-8663
> URL: https://issues.apache.org/jira/browse/KAFKA-8663
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 1.1.1, 2.3.0
>Reporter: GEORGE LI
>Priority: Minor
>
> From my observation/experience during reassignment,  the partition assignment 
> replica ordering gets changed.   because it's  OAR + RAR  (original replicas 
> + reassignment replicas)  set union. 
> However, it seems like the preferred leaders changed during the 
> reassignments.  Normally if there is no cluster preferred leader election,  
> the leader is still the old leader.  But if during the reassignments, there 
> is a leader election,  the leadership changes.  This caused some side 
> effects.  Let's look at this example.
> {code}
> Topic:georgeli_test   PartitionCount:8ReplicationFactor:3 Configs:
>   Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
> 1026,1028,1025Isr: 1026,1028,1025
> {code}
> reassignment  (1026,1028,1025) => (1027,1025,1028)
> {code}
> Topic:georgeli_test   PartitionCount:8ReplicationFactor:4 
> Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027
>   Topic: georgeli_testPartition: 0Leader: 1026Replicas: 
> 1027,1025,1028,1026   Isr: 1026,1028,1025
> {code}
> Notice the above:   Leader remains 1026.   but Replicas: 1027,1025,1028,1026. 
>   If we run preferred leader election,  it will try 1027 first, then 1025.
> After  1027 is in ISR,  then the final assignment will be  (1027,1025,1028).  
>   
> My proposal for a minor improvement is to keep the original ordering replicas 
> during the reassignment (could be long for big topic/partitions).  and after 
> all replicas in ISR, then finally set the partition assignment to New 
> reassignment.  
> {code}
>   val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ 
> controllerContext.partitionReplicaAssignment(topicPartition)).toSet
>   //1. Update AR in ZK with OAR + RAR.
>   updateAssignedReplicasForPartition(topicPartition, 
> newAndOldReplicas.toSeq)
> {code} 
> above code changed to below to keep the original ordering first during 
> reassignment: 
> {code}
>   val newAndOldReplicas = 
> (controllerContext.partitionReplicaAssignment(topicPartition) ++ 
> reassignedPartitionContext.newReplicas).toSet
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8693) Flakey test ErrorHandlingIntegrationTest #testSkipRetryAndDLQWithHeaders

2019-07-19 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8693:
--

 Summary: Flakey test  ErrorHandlingIntegrationTest 
#testSkipRetryAndDLQWithHeaders
 Key: KAFKA-8693
 URL: https://issues.apache.org/jira/browse/KAFKA-8693
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/438/console]

 
ERROR: Failed to write output for test null.Gradle Test Executor 25*12:00:41* 
java.lang.NullPointerException: Cannot invoke method write() on null 
object*12:00:41* at 
org.codehaus.groovy.runtime.NullObject.invokeMethod(NullObject.java:91)*12:00:41*
at 
org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.call(PogoMetaClassSite.java:47)*12:00:41*
 at 
org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*12:00:41*
  at 
org.codehaus.groovy.runtime.callsite.NullCallSite.call(NullCallSite.java:34)*12:00:41*
   at 
org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*12:00:41*
  at java_io_FileOutputStream$write.call(Unknown Source)*12:00:41*
at 
build_ctl3wpaytn6q6nw5ya090go8u$_run_closure5$_closure75$_closure108.doCall(/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/build.gradle:244)*12:00:41*
   at jdk.internal.reflect.GeneratedMethodAccessor280.invoke(Unknown 
Source)*12:00:41* at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*12:00:41*
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)*12:00:41*  
   at 
org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:104)*12:00:41*
  at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:326)*12:00:41*   
  at 
org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:264)*12:00:41*
 at 
groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1041)*12:00:41*at 
groovy.lang.Closure.call(Closure.java:411)*12:00:41* at 
org.gradle.listener.ClosureBackedMethodInvocationDispatch.dispatch(ClosureBackedMethodInvocationDispatch.java:40)*12:00:41*
  at 
org.gradle.listener.ClosureBackedMethodInvocationDispatch.dispatch(ClosureBackedMethodInvocationDispatch.java:25)*12:00:41*
  at 
org.gradle.internal.event.AbstractBroadcastDispatch.dispatch(AbstractBroadcastDispatch.java:42)*12:00:41*
at 
org.gradle.internal.event.BroadcastDispatch$SingletonDispatch.dispatch(BroadcastDispatch.java:230)*12:00:41*
 at 
org.gradle.internal.event.BroadcastDispatch$SingletonDispatch.dispatch(BroadcastDispatch.java:149)*12:00:41*
 at 
org.gradle.internal.event.AbstractBroadcastDispatch.dispatch(AbstractBroadcastDispatch.java:58)*12:00:41*
at 
org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:324)*12:00:41*
 at 
org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:234)*12:00:41*
 at 
org.gradle.internal.event.ListenerBroadcast.dispatch(ListenerBroadcast.java:140)*12:00:41*
   at 
org.gradle.internal.event.ListenerBroadcast.dispatch(ListenerBroadcast.java:37)*12:00:41*
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)*12:00:41*
at com.sun.proxy.$Proxy73.onOutput(Unknown Source)*12:00:41*at 
org.gradle.api.internal.tasks.testing.results.TestListenerAdapter.output(TestListenerAdapter.java:56)*12:00:41*
  at jdk.internal.reflect.GeneratedMethodAccessor260.invoke(Unknown 
Source)*12:00:41* at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*12:00:41*
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)*12:00:41*  
   at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)*12:00:41*
   at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)*12:00:41*
   at 
org.gradle.internal.event.AbstractBroadcastDispatch.dispatch(AbstractBroadcastDispatch.java:42)*12:00:41*
at 
org.gradle.internal.event.BroadcastDispatch$SingletonDispatch.dispatch(BroadcastDispatch.java:230)*12:00:41*
 at 
org.gradle.internal.event.BroadcastDispatch$SingletonDispatch.dispatch(BroadcastDispatch.java:149)*12:00:41*
 at 
org.gradle.internal.event.AbstractBroadcastDispatch.dispatch(AbstractBroadcastDispatch.java:58)*12:00:41*
at 
org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:324)*12:00:41*
 at 
org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:234)*12:00:41*
 at 
org.gradle.internal.event.ListenerBroadcast.dispatch(ListenerBroadcast.java:140)*12:00:41*
   at 
org.gradle.internal.event.ListenerBroadcast.dispatch(ListenerBroadcast.java:37)*12:00:41*
at 

[jira] [Created] (KAFKA-8692) Transient failure in kafka.api.SaslScramSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-07-19 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-8692:
--

 Summary: Transient failure in 
kafka.api.SaslScramSslEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
 Key: KAFKA-8692
 URL: https://issues.apache.org/jira/browse/KAFKA-8692
 Project: Kafka
  Issue Type: Bug
  Components: core, unit tests
Reporter: Bill Bejeck


Failed in build [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/420/]

 
{noformat}
Error Message
org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout 
instead of the expected 1 records


Stacktrace
org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout 
instead of the expected 1 records
at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
at org.scalatest.Assertions.fail(Assertions.scala:1091)
at org.scalatest.Assertions.fail$(Assertions.scala:1087)
at org.scalatest.Assertions$.fail(Assertions.scala:1389)
at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:822)
at kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:781)
at 
kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1309)
at kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1317)
at 
kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:522)
at 
kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:361)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at jdk.internal.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 

[jira] [Updated] (KAFKA-8691) Flakey test ProcessorContextTest#shouldNotAllowToScheduleZeroMillisecondPunctuation

2019-07-19 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8691:
---
Component/s: streams

> Flakey test  
> ProcessorContextTest#shouldNotAllowToScheduleZeroMillisecondPunctuation
> 
>
> Key: KAFKA-8691
> URL: https://issues.apache.org/jira/browse/KAFKA-8691
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6384/consoleFull]
> org.apache.kafka.streams.processor.internals.ProcessorContextTest > 
> shouldNotAllowToScheduleZeroMillisecondPunctuation PASSED*23:37:09* ERROR: 
> Failed to write output for test null.Gradle Test Executor 5*23:37:09* 
> java.lang.NullPointerException: Cannot invoke method write() on null 
> object*23:37:09*at 
> org.codehaus.groovy.runtime.NullObject.invokeMethod(NullObject.java:91)*23:37:09*
> at 
> org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.call(PogoMetaClassSite.java:47)*23:37:09*
>  at 
> org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*23:37:09*
>   at 
> org.codehaus.groovy.runtime.callsite.NullCallSite.call(NullCallSite.java:34)*23:37:09*
>at 
> org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:47)*23:37:09*
>   at java_io_FileOutputStream$write.call(Unknown Source)*23:37:09*
> at 
> build_5nv3fyjgqff9aim9wbxfnad9z$_run_closure5$_closure75$_closure108.doCall(/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/build.gradle:244)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8666) Improve Documentation on usage of Materialized config object

2019-07-19 Thread Alexander Zafirov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889138#comment-16889138
 ] 

Alexander Zafirov commented on KAFKA-8666:
--

Looking at the JavaDoc I think that it is elaborate enough
{code:java}
// Materialize a {@link StateStore} with the provided key and value {@link 
Serde}s.
* An internal name will be used for the store.
{code}
Let's hope that people are using modern IDEs that discourage calling the static 
methods on `Materialized` objects.

>From my standpoint I think this issue should be closed. Hope our discussion is 
>useful for the people out there.

Thank you, Matthias.

> Improve Documentation on usage of Materialized config object
> 
>
> Key: KAFKA-8666
> URL: https://issues.apache.org/jira/browse/KAFKA-8666
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: newbie
>
> When using the Materialized object if the user wants to name the statestore 
> with
> {code:java}
> Materialized.as("MyStoreName"){code}
> then subsequently provide the key and value serde the calls to do so must 
> take the form of 
> {code:java}
> Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde)
> {code}
> If users do the following 
> {code:java}
> Materialized.as("MyStoreName").with(keySerde, valueSerde)
> {code}
> the Materialized instance created by the "as(storeName)" call is replaced by 
> a new Materialized instance resulting from the "with(...)" call and any 
> configuration on the first Materialized instance is lost.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-07-19 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889109#comment-16889109
 ] 

Boyang Chen commented on KAFKA-8677:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6392/console]

> Flakey test 
> GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> 
>
> Key: KAFKA-8677
> URL: https://issues.apache.org/jira/browse/KAFKA-8677
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]
>  
> *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* 
> kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
>  *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* 
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 1 records



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8513) Add kafka-streams-application-reset.bat for Windows platform

2019-07-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8513.

   Resolution: Fixed
Fix Version/s: 2.4.0

> Add kafka-streams-application-reset.bat for Windows platform
> 
>
> Key: KAFKA-8513
> URL: https://issues.apache.org/jira/browse/KAFKA-8513
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Minor
> Fix For: 2.4.0
>
>
> For improving Windows support, it'd be nice if there were a batch file 
> corresponding to bin/kafka-streams-application-reset.sh.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8666) Improve Documentation on usage of Materialized config object

2019-07-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889058#comment-16889058
 ] 

Matthias J. Sax commented on KAFKA-8666:


{quote}we are throwing away information that the user already provided if they 
choose to build their object
{quote}
Not sure if I agree with this statement. Assume there would not be any static 
methods, but a public constructor for `Materialized`:

 
{code:java}
// current API
builder.table(..., 
Materialized.as("storeName").withKeySerde(keySerce).withValueSerde(valueSerde));
// or
builder.table(..., Materialized.with(keySerde, valueSerde));

// if we would not have static methods but public constructors it would be like 
this:
builder.table(..., new 
Materialized("storeName").withKeySerde(keySerce).withValueSerde(valueSerde));
// or
builder.table(..., new Materialized(keySerde, valueSerde));{code}
 

If you call 
{code:java}
builder.table(..., Materialized.as("storeName").with(keySerde, 
valueSerde));{code}
It's similar to call "new" twice and use the second object:
{code:java}
Materialized m;
m = new Materialized("storeName");
m = new Materialized(keySerde, valueSerde)
builder.table(..., m);{code}
Hence, KS does not "through away" information IMHO. For the "new" case it's 
obvious that the second object will not have a store name set – it's two 
objects after all. It might be somewhat subtle, but calling a static method in 
a builder like pattern is like calling "new" (and an modern IDE should actually 
give you a warning about it – a static method should never be called on an 
object, but only on the class).

 
{quote}Before going into solution mode I'd like to agree on something - we _do_ 
want the described functionality to be available, right? As you said we are 
using the `builder pattern` for that reason. Because in the commit message of 
the code says

_Add a `with(Serde keySerde, Serde valSerde)` to `Materialized` for cases where 
people don't care about the state store name._
{quote}
Yes. There are cased for which only passing in the Serdes is the best way and 
passing in a store name (that forces store materialization and disables store 
optimization) is undesired.
{quote}That leads me to believe that maybe that the code is working as it is 
supposed and we should focus and highlighting that to the developer rather than 
finding a way to comply with the request in this issue.
{quote}
Agreed.
{quote}If we do want continue `building` the the `Materialized` object please 
enlighten me what is the standard practise with Kafka APIs with regards to 
backwards compatibility? My intuition was that we shouldn't break code that 
relies on the current implementation - which will happen if we attempt to 
provide a fix for the way the current `with(keySerde, valueSerde)` is done. If 
that is not the case then a way to fix the problem is to remove the `static` 
identifier of the method. That will allows us to access previously provided 
`storeName`.
{quote}
Yes, code must be backward compatible. We could add a non-static 
`withStoreName(String)` method, that allows you to do:
{code:java}
Materialized.with(keySerde, valueSerde).withStoreName("storeName");{code}
But there woulds still be the problem that one could still do it wrong:
{code:java}
Materialized.with(keySerde, valueSerde).as("storeName");{code}
Therefore, I don't see a big advantage to add `withStoreName(String)` method. 
We can update the JavaDocs of course, but if people write "correct" JavaCode 
and don't call static methods on objects, they do it correctly automatically.

> Improve Documentation on usage of Materialized config object
> 
>
> Key: KAFKA-8666
> URL: https://issues.apache.org/jira/browse/KAFKA-8666
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: newbie
>
> When using the Materialized object if the user wants to name the statestore 
> with
> {code:java}
> Materialized.as("MyStoreName"){code}
> then subsequently provide the key and value serde the calls to do so must 
> take the form of 
> {code:java}
> Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde)
> {code}
> If users do the following 
> {code:java}
> Materialized.as("MyStoreName").with(keySerde, valueSerde)
> {code}
> the Materialized instance created by the "as(storeName)" call is replaced by 
> a new Materialized instance resulting from the "with(...)" call and any 
> configuration on the first Materialized instance is lost.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-7849) Warning when adding GlobalKTable

2019-07-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889046#comment-16889046
 ] 

Matthias J. Sax commented on KAFKA-7849:


That sounds correct.

> Warning when adding GlobalKTable
> 
>
> Key: KAFKA-7849
> URL: https://issues.apache.org/jira/browse/KAFKA-7849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Dmitry Minkovsky
>Assignee: Omar Al-Safi
>Priority: Minor
>  Labels: newbie
>
> Per 
> https://lists.apache.org/thread.html/59c119be8a2723c501e0653fa3ed571e8c09be40d5b5170c151528b5@%3Cusers.kafka.apache.org%3E
>  
> When I add a GlobalKTable for topic "message-write-service-user-ids-by-email" 
> to my topology, I get this warning:
>  
> [2019-01-19 12:18:14,008] WARN 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:421) 
> [Consumer 
> clientId=message-write-service-55f2ca4d-0efc-4344-90d3-955f9f5a65fd-StreamThread-2-consumer,
>  groupId=message-write-service] The following subscribed topics are not 
> assigned to any members: [message-write-service-user-ids-by-email] 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8687) Pass store name when creating a Materialized using with(keySerde, valueSerde)

2019-07-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889045#comment-16889045
 ] 

Matthias J. Sax commented on KAFKA-8687:


Why is `Materialized.as("myStoreName").withKeySerde(...).withValueSerde(...)`  
not sufficient?

> Pass store name when creating a Materialized using with(keySerde, valueSerde)
> -
>
> Key: KAFKA-8687
> URL: https://issues.apache.org/jira/browse/KAFKA-8687
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: jmhostalet
>Priority: Minor
>
> current implementation of Materialized does not permit setting the name when 
> using with(keySerde, valueSerde)
> {code:java}
> public static  Materialized 
> with(Serde keySerde, Serde valueSerde) {
> return (new 
> Materialized((String)null)).withKeySerde(keySerde).withValueSerde(valueSerde);
> }
> {code}
> it would be nice to have such a feature, for example:
> {code:java}
> public static  Materialized 
> with(Serde keySerde, Serde valueSerde) {
> return with((String)null, keySerde, valueSerde);
> }
> public static  Materialized with(String 
> name, Serde keySerde, Serde valueSerde) {
> return (new 
> Materialized(name)).withKeySerde(keySerde).withValueSerde(valueSerde);
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8690) Flakey test ConnectWorkerIntegrationTest#testAddAndRemoveWorke

2019-07-19 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8690:
--

 Summary: Flakey test  
ConnectWorkerIntegrationTest#testAddAndRemoveWorke
 Key: KAFKA-8690
 URL: https://issues.apache.org/jira/browse/KAFKA-8690
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23570/consoleFull]
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest > 
testAddAndRemoveWorker STARTED*02:56:46* 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker.test.stdout*02:56:46*
 *02:56:46* org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest > 
testAddAndRemoveWorker FAILED*02:56:46* java.lang.AssertionError: Condition 
not met within timeout 15000. Connector tasks did not start in time.*02:56:46*  
   at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*02:56:46*  
   at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*02:56:46*  
   at 
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testAddAndRemoveWorker(ConnectWorkerIntegrationTest.java:118)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-7937) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup

2019-07-19 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888978#comment-16888978
 ] 

Boyang Chen commented on KAFKA-7937:


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23570/consoleFull]

> Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup
> 
>
> Key: KAFKA-7937
> URL: https://issues.apache.org/jira/browse/KAFKA-7937
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients, unit tests
>Affects Versions: 2.2.0, 2.1.1, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Gwen Shapira
>Priority: Critical
> Fix For: 2.4.0
>
> Attachments: log-job6122.txt
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/19/pipeline
> {quote}kafka.admin.ResetConsumerGroupOffsetTest > 
> testResetOffsetsNotExistingGroup FAILED 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available. at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:306)
>  at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89)
>  Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.{quote}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8689) Cannot Name Join State Store Topics

2019-07-19 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-8689.

Resolution: Duplicate

Duplicate of https://issues.apache.org/jira/browse/KAFKA-8558

> Cannot Name Join State Store Topics
> ---
>
> Key: KAFKA-8689
> URL: https://issues.apache.org/jira/browse/KAFKA-8689
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Simon Dean
>Priority: Major
>
> Performing a join on two KStreams, produces two state store topics.  
> Currently the names state store topics are auto generated and cannot be 
> overridden. 
> Example code:
>  
> {code:java}
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.LongSerializer;
> import org.apache.kafka.common.serialization.Serde;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.JoinWindows;
> import org.apache.kafka.streams.kstream.Joined;
> import org.apache.kafka.streams.kstream.KStream;
> import java.time.Duration;
> import java.util.HashMap;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.TimeUnit;
> public class JoinTopicNamesExample {
> public static void main(final String[] args) throws InterruptedException {
> new Thread(() -> {
> produce(args);
> }).run();
> new Thread(() -> {
> try {
> streams(args);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }).run();
> }
> private static void produce(String[] args) {
> Map props = new HashMap<>();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(ProducerConfig.RETRIES_CONFIG, 0);
> props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
> props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
> props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> LongSerializer.class);
> KafkaProducer producer = new KafkaProducer<>(props);
> for (long i = 0; i < 10; i++) {
> producer.send(new ProducerRecord("left", Long.toString(i), i));
> }
> for (long i = 0; i < 10; i++) {
> producer.send(new ProducerRecord("right", Long.toString(i), i));
> }
> }
> private static void streams(String[] args) throws InterruptedException {
> final String bootstrapServers = args.length > 0 ? args[0] : 
> "localhost:9092";
> final Properties streamsConfiguration = new Properties();
> // Give the Streams application a unique name.  The name must be 
> unique in the Kafka cluster
> // against which the application is run.
> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "join-topic-names-example");
> streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, 
> "user-region-lambda-example-client");
> // Where to find Kafka broker(s).
> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServers);
> // Specify default (de)serializers for record keys and for record 
> values.
> 
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());
> 
> streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.Long().getClass().getName());
> // Records should be flushed every 10 seconds. This is less than the 
> default
> // in order to keep this example interactive.
> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 
> * 1000);
> final Serde stringSerde = Serdes.String();
> final Serde longSerde = Serdes.Long();
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream left = builder.stream("left", 
> Consumed.with(stringSerde, longSerde));
> final KStream right = builder.stream("right", 
> Consumed.with(stringSerde, longSerde));
> left.join(
> right,
> (value1, value2) -> value1 + value2,
> JoinWindows.of(Duration.ofHours(1)), 
> Joined.as("sum"));
> 

[jira] [Commented] (KAFKA-8689) Cannot Name Join State Store Topics

2019-07-19 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888974#comment-16888974
 ] 

Bill Bejeck commented on KAFKA-8689:


Hi [~msmsimondean]

Thanks for filing a Jira on this item and your timing is impeccable.

We currently have a KIP to remedy this situation - 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join].
  Here's the corresponding discussion thread so far 
[https://www.mail-archive.com/dev@kafka.apache.org/msg98777.html].  Feel free 
to add any additional comments or questions you may have on the discussion 
thread.

Here's the corresponding Jira https://issues.apache.org/jira/browse/KAFKA-8558.

I'm going to close this ticket as a duplicate of KAFKA-8558, and you'll be able 
to track progress on this issue via that ticket and the corresponding PR 
(coming soon).

 

Thanks again for your interest.

 

> Cannot Name Join State Store Topics
> ---
>
> Key: KAFKA-8689
> URL: https://issues.apache.org/jira/browse/KAFKA-8689
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Simon Dean
>Priority: Major
>
> Performing a join on two KStreams, produces two state store topics.  
> Currently the names state store topics are auto generated and cannot be 
> overridden. 
> Example code:
>  
> {code:java}
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.LongSerializer;
> import org.apache.kafka.common.serialization.Serde;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.JoinWindows;
> import org.apache.kafka.streams.kstream.Joined;
> import org.apache.kafka.streams.kstream.KStream;
> import java.time.Duration;
> import java.util.HashMap;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.TimeUnit;
> public class JoinTopicNamesExample {
> public static void main(final String[] args) throws InterruptedException {
> new Thread(() -> {
> produce(args);
> }).run();
> new Thread(() -> {
> try {
> streams(args);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }).run();
> }
> private static void produce(String[] args) {
> Map props = new HashMap<>();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(ProducerConfig.RETRIES_CONFIG, 0);
> props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
> props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
> props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> LongSerializer.class);
> KafkaProducer producer = new KafkaProducer<>(props);
> for (long i = 0; i < 10; i++) {
> producer.send(new ProducerRecord("left", Long.toString(i), i));
> }
> for (long i = 0; i < 10; i++) {
> producer.send(new ProducerRecord("right", Long.toString(i), i));
> }
> }
> private static void streams(String[] args) throws InterruptedException {
> final String bootstrapServers = args.length > 0 ? args[0] : 
> "localhost:9092";
> final Properties streamsConfiguration = new Properties();
> // Give the Streams application a unique name.  The name must be 
> unique in the Kafka cluster
> // against which the application is run.
> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "join-topic-names-example");
> streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, 
> "user-region-lambda-example-client");
> // Where to find Kafka broker(s).
> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServers);
> // Specify default (de)serializers for record keys and for record 
> values.
> 
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());
> 
> streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.Long().getClass().getName());
> // Records should be flushed every 10 seconds. This is less than the 
> default
> // in order 

[jira] [Updated] (KAFKA-8558) KIP-479 - Add Materialized Overload to KStream#Join

2019-07-19 Thread Bill Bejeck (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-8558:
---
Description: 
To prevent a topology incompatibility with the release of 2.4 and the naming of 
Join operations we'll add an overloaded KStream#join method accepting a 
Materialized parameter. This will allow users to explicitly name state stores 
created by Kafka Streams in the join operation.

 

The overloads will apply to all flavors of KStream#join (inner, left, and 
right). 

  was:
To prevent a topology incompatibility with the release of 2.4 and the naming of 
Join operations we'll add an overloaded KStream#join method accepting a 
Materialized parameter. The overloads will apply to all flavors of KStream#join 
(inner, left, and right). 

Additionally, new methods withQueryingEnabled and withQueryingDisabled are 
going to be added to Materialized


> KIP-479 - Add Materialized Overload to KStream#Join 
> 
>
> Key: KAFKA-8558
> URL: https://issues.apache.org/jira/browse/KAFKA-8558
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.4.0
>
>
> To prevent a topology incompatibility with the release of 2.4 and the naming 
> of Join operations we'll add an overloaded KStream#join method accepting a 
> Materialized parameter. This will allow users to explicitly name state stores 
> created by Kafka Streams in the join operation.
>  
> The overloads will apply to all flavors of KStream#join (inner, left, and 
> right). 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-4790) Kafka cannot recover after a disk full

2019-07-19 Thread bugnenno (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1628#comment-1628
 ] 

bugnenno edited comment on KAFKA-4790 at 7/19/19 2:35 PM:
--

This happens also in 2.1.1.

I removed property "log.index.size.max.bytes" from my custom settings and error 
not appear again


was (Author: bugnenno):
This happens also in 2.1.1

> Kafka cannot recover after a disk full
> --
>
> Key: KAFKA-4790
> URL: https://issues.apache.org/jira/browse/KAFKA-4790
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.1.1, 2.1.0
>Reporter: Pengwei
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
> [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
> (kafka.log.Log)
> [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Attempt to append to a full index (size = 128000). (kafka.log.LogManager)
> [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Attempt to append to 
> a full index (size = 128000).
>   at scala.Predef$.require(Predef.scala:219)
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
>   at kafka.log.LogSegment.recover(LogSegment.scala:191)
>   at kafka.log.Log.recoverLog(Log.scala:259)
>   at kafka.log.Log.loadSegments(Log.scala:234)
>   at kafka.log.Log.(Log.scala:92)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8689) Cannot Name Join State Store Topics

2019-07-19 Thread Simon Dean (JIRA)
Simon Dean created KAFKA-8689:
-

 Summary: Cannot Name Join State Store Topics
 Key: KAFKA-8689
 URL: https://issues.apache.org/jira/browse/KAFKA-8689
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.3.0
Reporter: Simon Dean


Performing a join on two KStreams, produces two state store topics.  Currently 
the names state store topics are auto generated and cannot be overridden. 

Example code:

 
{code:java}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class JoinTopicNamesExample {

public static void main(final String[] args) throws InterruptedException {
new Thread(() -> {
produce(args);
}).run();

new Thread(() -> {
try {
streams(args);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).run();
}

private static void produce(String[] args) {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
LongSerializer.class);

KafkaProducer producer = new KafkaProducer<>(props);

for (long i = 0; i < 10; i++) {
producer.send(new ProducerRecord("left", Long.toString(i), i));
}

for (long i = 0; i < 10; i++) {
producer.send(new ProducerRecord("right", Long.toString(i), i));
}
}

private static void streams(String[] args) throws InterruptedException {
final String bootstrapServers = args.length > 0 ? args[0] : 
"localhost:9092";
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name.  The name must be unique 
in the Kafka cluster
// against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"join-topic-names-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, 
"user-region-lambda-example-client");
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
// Specify default (de)serializers for record keys and for record 
values.
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());

streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass().getName());
// Records should be flushed every 10 seconds. This is less than the 
default
// in order to keep this example interactive.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 
1000);

final Serde stringSerde = Serdes.String();
final Serde longSerde = Serdes.Long();

final StreamsBuilder builder = new StreamsBuilder();

final KStream left = builder.stream("left", 
Consumed.with(stringSerde, longSerde));
final KStream right = builder.stream("right", 
Consumed.with(stringSerde, longSerde));

left.join(
right,
(value1, value2) -> value1 + value2,
JoinWindows.of(Duration.ofHours(1)), 
Joined.as("sum"));

final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsConfiguration);

streams.start();

Thread.sleep(TimeUnit.MINUTES.toMillis(1));

// Add shutdown hook to respond to SIGTERM and gracefully close Kafka 
Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

}
{code}
 

 

Here are the topics produce by the example code:
 * 

[jira] [Created] (KAFKA-8688) Upgrade system tests fail due to data loss with older message format

2019-07-19 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-8688:
-

 Summary: Upgrade system tests fail due to data loss with older 
message format
 Key: KAFKA-8688
 URL: https://issues.apache.org/jira/browse/KAFKA-8688
 Project: Kafka
  Issue Type: Bug
  Components: system tests
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram


System test failure for TestUpgrade/test_upgrade: from_kafka_version=0.9.0.1, 
to_message_format_version=0.9.0.1, compression_types=.lz4
{code:java}
3 acked message did not make it to the Consumer. They are: [33906, 33900, 
33903]. The first 3 missing messages were validated to ensure they are in 
Kafka's data files. 3 were missing. This suggests data loss. Here are some of 
the messages not found in the data files: [33906, 33900, 33903]

Traceback (most recent call last):
  File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
 line 132, in run
    data = self.run_test()
  File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/tests/runner_client.py",
 line 189, in run_test
    return self.test_context.function(self.test)
  File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.5-py2.7.egg/ducktape/mark/_mark.py",
 line 428, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/core/upgrade_test.py",
 line 136, in test_upgrade
    self.run_produce_consume_validate(core_test_action=lambda: 
self.perform_upgrade(from_kafka_version,
  File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/produce_consume_validate.py",
 line 112, in run_produce_consume_validate
    self.validate()
  File 
"/home/jenkins/workspace/system-test-kafka_5.3.x/kafka/tests/kafkatest/tests/produce_consume_validate.py",
 line 135, in validate
    assert succeeded, error_msg
AssertionError: 3 acked message did not make it to the Consumer. They are: 
[33906, 33900, 33903]. The first 3 missing messages were validated to ensure 
they are in Kafka's data files. 3 were missing. This suggests data loss. Here 
are some of the messages not found in the data files: [33906, 33900, 33903]
{code}
Logs show:
 # Broker 1 is leader of partition
 # Broker 2 successfully fetches from offset 10947 and processes request
 # Broker 2 sends fetch request to broker 1 for offset 10950
 # Broker 1 sets is HW to 10950, acknowledges produce requests up to HW
 # Broker 2 is elected leader
 # Broker 2 truncates to its local HW of 10947 - 3 messages are lost

This data loss is a known issue that was fixed under KIP-101. But since this 
can still happen with older messages formats, we should update upgrade tests to 
cope with some data loss.
  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-4790) Kafka cannot recover after a disk full

2019-07-19 Thread bugnenno (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1628#comment-1628
 ] 

bugnenno commented on KAFKA-4790:
-

This happens also in 2.1.1

> Kafka cannot recover after a disk full
> --
>
> Key: KAFKA-4790
> URL: https://issues.apache.org/jira/browse/KAFKA-4790
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1, 0.10.1.1, 2.1.0
>Reporter: Pengwei
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> [2017-02-23 18:43:57,736] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2017-02-23 18:43:57,887] INFO Loading logs. (kafka.log.LogManager)
> [2017-02-23 18:43:57,935] INFO Recovering unflushed segment 0 in log test1-0. 
> (kafka.log.Log)
> [2017-02-23 18:43:59,297] ERROR There was an error in one of the threads 
> during logs loading: java.lang.IllegalArgumentException: requirement failed: 
> Attempt to append to a full index (size = 128000). (kafka.log.LogManager)
> [2017-02-23 18:43:59,299] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.IllegalArgumentException: requirement failed: Attempt to append to 
> a full index (size = 128000).
>   at scala.Predef$.require(Predef.scala:219)
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:200)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:199)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:199)
>   at kafka.log.LogSegment.recover(LogSegment.scala:191)
>   at kafka.log.Log.recoverLog(Log.scala:259)
>   at kafka.log.Log.loadSegments(Log.scala:234)
>   at kafka.log.Log.(Log.scala:92)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$4$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:201)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8687) Pass store name when creating a Materialized using with(keySerde, valueSerde)

2019-07-19 Thread jmhostalet (JIRA)
jmhostalet created KAFKA-8687:
-

 Summary: Pass store name when creating a Materialized using 
with(keySerde, valueSerde)
 Key: KAFKA-8687
 URL: https://issues.apache.org/jira/browse/KAFKA-8687
 Project: Kafka
  Issue Type: Wish
  Components: streams
Affects Versions: 2.3.0
Reporter: jmhostalet


current implementation of Materialized does not permit setting the name when 
using with(keySerde, valueSerde)
{code:java}
public static  Materialized with(Serde 
keySerde, Serde valueSerde) {
return (new 
Materialized((String)null)).withKeySerde(keySerde).withValueSerde(valueSerde);
}
{code}
it would be nice to have such a feature, for example:
{code:java}
public static  Materialized with(Serde 
keySerde, Serde valueSerde) {
return with((String)null, keySerde, valueSerde);
}

public static  Materialized with(String 
name, Serde keySerde, Serde valueSerde) {
return (new 
Materialized(name)).withKeySerde(keySerde).withValueSerde(valueSerde);
}
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8646) Materialized.withLoggingDisabled() does not disable changelog topics creation

2019-07-19 Thread jmhostalet (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jmhostalet resolved KAFKA-8646.
---
Resolution: Not A Bug

> Materialized.withLoggingDisabled() does not disable changelog topics creation
> -
>
> Key: KAFKA-8646
> URL: https://issues.apache.org/jira/browse/KAFKA-8646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: jmhostalet
>Assignee: Bill Bejeck
>Priority: Minor
>
> I have a cluster with 3 brokers running version 0.11
> My kafka-streams app was using kafka-client 0.11.0.1 but recently I've 
> migrated to 2.3.0
> I have no executed any migration as my data is disposable, therefore I have 
> deleted all intermediate topics, except input and output topics.
> My streams config is: 
> {code:java}
> application.id = consumer-id-v1.00
> application.server =
> bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
> buffered.records.per.partition = 1000
> cache.max.bytes.buffering = 524288000
> client.id =
> commit.interval.ms = 3
> connections.max.idle.ms = 54
> default.deserialization.exception.handler = class 
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
> default.key.serde = class 
> org.apache.kafka.common.serialization.Serdes$StringSerde
> default.production.exception.handler = class 
> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
> default.timestamp.extractor = class com.acme.stream.TimeExtractor
> default.value.serde = class com.acme.serde.MyDtoSerde
> max.task.idle.ms = 0
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 3
> num.standby.replicas = 0
> num.stream.threads = 25
> partition.grouper = class 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> poll.ms = 100
> processing.guarantee = at_least_once
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> replication.factor = 1
> request.timeout.ms = 4
> retries = 0
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 60
> state.dir = /tmp/kafka-streams
> topology.optimization = none
> upgrade.from = null
> windowstore.changelog.additional.retention.ms = 8640
> {code}
> in my stream I am using withLoggingDisabled 
> {code:java}
> stream.filter((key, val) -> val!=null)
> .selectKey((key, val) -> getId(val))
> .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new 
> MyDtoSerde()))
> .windowedBy(TimeWindows.of(aggregationWindowSizeDuration)
>.grace(windowRetentionPeriodDuration))
> .aggregate(MyDto::new,
>new MyUpdater(),
>Materialized.as("aggregation-updater")
>.withLoggingDisabled()
>.with(Serdes.String(), new MyDtoSerde()))
> .toStream((k, v) -> k.key())
> .mapValues(val -> { ...
> {code}
> but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter 
> if I delete them before running again the app or if I change the 
> application.id
> With a new application.id, topics are recreated with the new prefix.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8646) Materialized.withLoggingDisabled() does not disable changelog topics creation

2019-07-19 Thread jmhostalet (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1601#comment-1601
 ] 

jmhostalet commented on KAFKA-8646:
---

Hi Bill,

after a huge effort to match the generic types, it has worked thanks to your 
advice.

Thank you very much!

 

> Materialized.withLoggingDisabled() does not disable changelog topics creation
> -
>
> Key: KAFKA-8646
> URL: https://issues.apache.org/jira/browse/KAFKA-8646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: jmhostalet
>Assignee: Bill Bejeck
>Priority: Minor
>
> I have a cluster with 3 brokers running version 0.11
> My kafka-streams app was using kafka-client 0.11.0.1 but recently I've 
> migrated to 2.3.0
> I have no executed any migration as my data is disposable, therefore I have 
> deleted all intermediate topics, except input and output topics.
> My streams config is: 
> {code:java}
> application.id = consumer-id-v1.00
> application.server =
> bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092]
> buffered.records.per.partition = 1000
> cache.max.bytes.buffering = 524288000
> client.id =
> commit.interval.ms = 3
> connections.max.idle.ms = 54
> default.deserialization.exception.handler = class 
> org.apache.kafka.streams.errors.LogAndFailExceptionHandler
> default.key.serde = class 
> org.apache.kafka.common.serialization.Serdes$StringSerde
> default.production.exception.handler = class 
> org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
> default.timestamp.extractor = class com.acme.stream.TimeExtractor
> default.value.serde = class com.acme.serde.MyDtoSerde
> max.task.idle.ms = 0
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 3
> num.standby.replicas = 0
> num.stream.threads = 25
> partition.grouper = class 
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> poll.ms = 100
> processing.guarantee = at_least_once
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> replication.factor = 1
> request.timeout.ms = 4
> retries = 0
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 60
> state.dir = /tmp/kafka-streams
> topology.optimization = none
> upgrade.from = null
> windowstore.changelog.additional.retention.ms = 8640
> {code}
> in my stream I am using withLoggingDisabled 
> {code:java}
> stream.filter((key, val) -> val!=null)
> .selectKey((key, val) -> getId(val))
> .groupByKey(Grouped.as("key-grouper").with(Serdes.String(), new 
> MyDtoSerde()))
> .windowedBy(TimeWindows.of(aggregationWindowSizeDuration)
>.grace(windowRetentionPeriodDuration))
> .aggregate(MyDto::new,
>new MyUpdater(),
>Materialized.as("aggregation-updater")
>.withLoggingDisabled()
>.with(Serdes.String(), new MyDtoSerde()))
> .toStream((k, v) -> k.key())
> .mapValues(val -> { ...
> {code}
> but changelog topics are created (KSTREAM-AGGREGATE-STATE-STORE), no matter 
> if I delete them before running again the app or if I change the 
> application.id
> With a new application.id, topics are recreated with the new prefix.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-1111) Broker prematurely accepts TopicMetadataRequests on startup

2019-07-19 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888789#comment-16888789
 ] 

Sönke Liebau commented on KAFKA-:
-

I've not tested this, but there has been some work around TopicMetadataRequests 
creating topics. Most notably KAFKA-5291 which is at least related to this, but 
certainly doesn't fix it.

Looking at the date on this though, is this still an issue or can we close this?

> Broker prematurely accepts TopicMetadataRequests on startup
> ---
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Rosenberg
>Assignee: Neha Narkhede
>Priority: Major
>
> I have an issue where on startup, the broker starts accepting 
> TopicMetadataRequests before it has had metadata sync'd from the controller.  
> This results in a bunch of log entries that look like this:
> 013-11-01 03:26:01,577  INFO [kafka-request-handler-0] admin.AdminUtils$ - 
> Topic creation { "partitions":{ "0":[ 9, 10 ] }, "version":1 }
> 2013-11-01 03:26:07,767  INFO [kafka-request-handler-1] admin.AdminUtils$ - 
> Topic creation { "partitions":{ "0":[ 9, 11 ] }, "version":1 }
> 2013-11-01 03:26:07,823  INFO [kafka-request-handler-1] admin.AdminUtils$ - 
> Topic creation { "partitions":{ "0":[ 10, 11 ] }, "version":1 }
> 2013-11-01 03:26:11,183  INFO [kafka-request-handler-2] admin.AdminUtils$ - 
> Topic creation { "partitions":{ "0":[ 10, 11 ] }, "version":1 }
> From an email thread, Neha remarks:
> Before a broker receives the first
> LeaderAndIsrRequest/UpdateMetadataRequest, it is technically not ready to
> start serving any request. But it still ends up serving
> TopicMetadataRequest which can re-create topics accidentally. It shouldn't
> succeed, but this is still a problem.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-1099) StopReplicaRequest and StopReplicaResponse should also carry the replica ids

2019-07-19 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-1099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888786#comment-16888786
 ] 

Sönke Liebau commented on KAFKA-1099:
-

Pretty sure this information has not been included in the requests until now 
(at least I cannot find it in the code), but as this is now 6 years old, is 
this still relevant or has become obsolete by different changes in the meantime?

> StopReplicaRequest and StopReplicaResponse should also carry the replica ids
> 
>
> Key: KAFKA-1099
> URL: https://issues.apache.org/jira/browse/KAFKA-1099
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Major
>
> The stop replica request and response only contain a list of partitions for 
> which a replica should be moved to offline/nonexistent state. But the replica 
> id information is implicit in the network layer as the receiving broker. This 
> complicates stop replica response handling on the controller. This blocks the 
> right fix for KAFKA-1097 since it requires invoking callback for processing a 
> StopReplicaResponse and it requires to know the replica id from the 
> StopReplicaResponse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-1234) All kafka-run-class.sh to source in user config file (to set env vars like KAFKA_OPTS)

2019-07-19 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-1234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888782#comment-16888782
 ] 

Sönke Liebau commented on KAFKA-1234:
-

I'm unsure what the "gospel" way of doing this is, but my personal belief is 
that tools that run Kafka as a service should allow for management of 
environment variables.
Most prevalent these days is probably systemd, which makes this very easy. I 
have limited experience with Upstart but do believe this is possible, if 
somewhat awkward. For init.d scripts I have not really found an acceptable 
solution apart from sourcing an env file from the init.d script, but as Kafka 
does not include these you would have to create them yourself anyway, so 
sourcing something should not create any conflicts with updates etc.

Relying on a file in the Kafka base dir bears the risk of losing this file 
during an update, depending on how that update is performed, so I'd prefer 
relying on external methods.



> All kafka-run-class.sh to source in user config file (to set env vars like 
> KAFKA_OPTS)
> --
>
> Key: KAFKA-1234
> URL: https://issues.apache.org/jira/browse/KAFKA-1234
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Affects Versions: 0.8.0
>Reporter: Alex Gray
>Priority: Trivial
> Attachments: patch.txt
>
>
> We are using the distributed version of kafka:
> http://apache.mirrors.lucidnetworks.net/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz
> And we would like to set some environment variables, particularly KAFKA_OPTS 
> when the kafka service starts.
> In other words, when someone does a "sudo service kafka start" we would like 
> to set some environment variables.
> We cannot do this *without* modifying either /etc/init.d/kafka or 
> bin/kafka-run-class.sh, and we don't want to modify files that we don't own.
> The solution is to have kafka source in a user specific file that may have 
> these environment variables set.
> I'm attaching the patch file to show you what I mean.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-7849) Warning when adding GlobalKTable

2019-07-19 Thread Omar Al-Safi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888772#comment-16888772
 ] 

Omar Al-Safi commented on KAFKA-7849:
-

Thank you [~mjsax]!  I want to validate my findings with you, as I understood 
from the code and {{DEBUG}} logs, {{GlobalStreamThread}} should be responsible 
for subscribing to all the global topics as I can see 
[here|https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L698],
 therefore {{StreamThread}} shouldn't subscribe to any of these topics as the 
{{GlobalStreamThread}} is the sole responsible for this. Based on this 
understanding, I was looking at {{sourceTopicPattern()}} in 
{{InternalTopologyBuilder}} since is responsible for returning the topics 
pattern that consumer in {{StreamThread}} should subscribe to, as I can see, 
this method doesn't pay attention to the global topics although we have this 
information in {{globalTopics}} HashSet, therefore I believe to fix this issue, 
we will just need to filter out all global topics from that method before 
returning the topic pattern, do you think that makes since?  

> Warning when adding GlobalKTable
> 
>
> Key: KAFKA-7849
> URL: https://issues.apache.org/jira/browse/KAFKA-7849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Dmitry Minkovsky
>Assignee: Omar Al-Safi
>Priority: Minor
>  Labels: newbie
>
> Per 
> https://lists.apache.org/thread.html/59c119be8a2723c501e0653fa3ed571e8c09be40d5b5170c151528b5@%3Cusers.kafka.apache.org%3E
>  
> When I add a GlobalKTable for topic "message-write-service-user-ids-by-email" 
> to my topology, I get this warning:
>  
> [2019-01-19 12:18:14,008] WARN 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:421) 
> [Consumer 
> clientId=message-write-service-55f2ca4d-0efc-4344-90d3-955f9f5a65fd-StreamThread-2-consumer,
>  groupId=message-write-service] The following subscribed topics are not 
> assigned to any members: [message-write-service-user-ids-by-email] 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-822) Reassignment of partitions needs a cleanup

2019-07-19 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888771#comment-16888771
 ] 

Sönke Liebau commented on KAFKA-822:


While not 100% sure how this looks in today's code, but I do believe that the 
/admin/reassign_partitons znode gets deleted after a finished reassignment at 
least (see for example KAFKA-6193).

I'd be in favor of closing this due to no activity for more than 6 years.

> Reassignment of partitions needs a cleanup
> --
>
> Key: KAFKA-822
> URL: https://issues.apache.org/jira/browse/KAFKA-822
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, tools
>Affects Versions: 0.8.0
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>Priority: Major
>  Labels: bugs
>
> 1. This is probably a left-over from when the ReassignPartitionsCommand used 
> to be blocking: 
> Currently, for each partition that is reassigned, controller deletes the 
> /admin/reassign_partitions zk path, and populates it with a new list with the 
> reassigned partition removed from the original list. This is probably an 
> overkill, and we can delete the zk path completely once the reassignment of 
> all partitions has completed successfully or in error. 
> 2. It will help to clarify that there could be no replicas that have started 
> and are not in the ISR when KafkaController.onPartitionReassignment() is 
> called.
> 3. We should batch the requests in 
> KafkaController.StopOldReplicasOfReassignedPartition()
> 4. Update controllerContext.partitionReplicaAssignment only once in 
> KafkaController.updateAssignedReplicasForPartition().
> 5. Need to thoroughly test.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-1016) Broker should limit purgatory size

2019-07-19 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-1016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888768#comment-16888768
 ] 

Sönke Liebau commented on KAFKA-1016:
-

Is this still relevant after the Purgatory redesign in KAFKA-1430 and 
KAFKA-1989 ?
It seems to me that the improvements made there would at least alleviate the 
issue described here due to vast performance improvements, even if no hard 
limit is introduced (which I'm not sure we want to do, as this basically would 
mean limiting the number of consumers that we are willing to serve).

> Broker should limit purgatory size
> --
>
> Key: KAFKA-1016
> URL: https://issues.apache.org/jira/browse/KAFKA-1016
> Project: Kafka
>  Issue Type: Bug
>  Components: purgatory
>Affects Versions: 0.8.0
>Reporter: Chris Riccomini
>Assignee: Joel Koshy
>Priority: Major
>
> I recently ran into a case where a poorly configured Kafka consumer was able 
> to trigger out of memory exceptions in multiple Kafka brokers. The consumer 
> was configured to have a fetcher.max.wait of Int.MaxInt.
> For low volume topics, this configuration causes the consumer to block for 
> frequently, and for long periods of time. [~junrao] informs me that the fetch 
> request will time out after the socket timeout is reached. In our case, this 
> was set to 30s.
> With several thousand consumer threads, the fetch request purgatory got into 
> the 100,000-400,000 range, which we believe triggered the out of memory 
> exception. [~nehanarkhede] claims to have seem similar behavior in other high 
> volume clusters.
> It kind of seems like a bad thing that a poorly configured consumer can 
> trigger out of memory exceptions in the broker. I was thinking maybe it makes 
> sense to have the broker try and protect itself from this situation. Here are 
> some potential solutions:
> 1. Have a broker-side max wait config for fetch requests.
> 2. Threshold the purgatory size, and either drop the oldest connections in 
> purgatory, or reject the newest fetch requests when purgatory is full.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8666) Improve Documentation on usage of Materialized config object

2019-07-19 Thread Alexander Zafirov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16888684#comment-16888684
 ] 

Alexander Zafirov commented on KAFKA-8666:
--

Makes sense, Matthias. Yet currently as described in the issue we are facing a 
controversy - we are throwing away information that the user already provided 
if they choose to build their object. That should not be the case.

Before going into solution mode I'd like to agree on something - we _do_ want 
the described functionality to be available, right? As you said we are using 
the `builder pattern` for that reason. Because in the commit message of the 
code says

_Add a `with(Serde keySerde, Serde valSerde)` to `Materialized` for cases where 
people don't care about the state store name._

That leads me to believe that maybe that the code is working as it is supposed 
and we should focus and highlighting that to the developer rather than finding 
a way to comply with the request in this issue.

If we do want continue `building` the the `Materialized` object please 
enlighten me what is the standard practise with Kafka APIs with regards to 
backwards compatibility? My intuition was that we shouldn't break code that 
relies on the current implementation - which will happen if we attempt to 
provide a fix for the way the current `with(keySerde, valueSerde)` is done. If 
that is not the case then a way to fix the problem is to remove the `static` 
identifier of the method. That will allows us to access previously provided 
`storeName`.

 

> Improve Documentation on usage of Materialized config object
> 
>
> Key: KAFKA-8666
> URL: https://issues.apache.org/jira/browse/KAFKA-8666
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: newbie
>
> When using the Materialized object if the user wants to name the statestore 
> with
> {code:java}
> Materialized.as("MyStoreName"){code}
> then subsequently provide the key and value serde the calls to do so must 
> take the form of 
> {code:java}
> Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde)
> {code}
> If users do the following 
> {code:java}
> Materialized.as("MyStoreName").with(keySerde, valueSerde)
> {code}
> the Materialized instance created by the "as(storeName)" call is replaced by 
> a new Materialized instance resulting from the "with(...)" call and any 
> configuration on the first Materialized instance is lost.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)