[jira] [Created] (KAFKA-9114) Kafka broker fails to establish secure zookeeper connection via SSL.

2019-10-29 Thread Gangadhar Balikai (Jira)
Gangadhar Balikai created KAFKA-9114:


 Summary: Kafka broker fails to establish secure zookeeper 
connection via SSL.
 Key: KAFKA-9114
 URL: https://issues.apache.org/jira/browse/KAFKA-9114
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.3.0, 2.3.1
Reporter: Gangadhar Balikai


When i try to enable TLS/SSL between Kafka broker (tried 2.3.0 && 2.3.1) and 
zookeeper (3.5.5 & 3.5.6) cluster of 3 nodes. 

kafka broker fails with following stack trace, i have given stacktrace, kafka & 
zookeeper configurations used below.

*JDK*: 1_8_0_161_64

{color:#de350b}[2019-10-30 03:52:10,036] ERROR Fatal error during KafkaServer 
startup. Prepare to shutdown (kafka.server.KafkaServer){color}

{color:#de350b}java.io.IOException: Couldn't instantiate 
org.apache.zookeeper.ClientCnxnSocketNetty{color}
{color:#de350b} at 
org.apache.zookeeper.ZooKeeper.getClientCnxnSocket(ZooKeeper.java:1851){color}
{color:#de350b} at 
org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:453){color}
{color:#de350b} at 
org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:384){color}
{color:#de350b} at 
kafka.zookeeper.ZooKeeperClient.(ZooKeeperClient.scala:103){color}
{color:#de350b} at 
kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826){color}
{color:#de350b} at 
kafka.server.KafkaServer.createZkClient$1(KafkaServer.scala:364){color}
{color:#de350b} at 
kafka.server.KafkaServer.initZkClient(KafkaServer.scala:387){color}
{color:#de350b} at 
kafka.server.KafkaServer.startup(KafkaServer.scala:207){color}
{color:#de350b} at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38){color}
{color:#de350b} at kafka.Kafka$.main(Kafka.scala:84){color}
{color:#de350b} at kafka.Kafka.main(Kafka.scala){color}
{color:#de350b}Caused by: java.lang.NoSuchMethodException: 
org.apache.zookeeper.ClientCnxnSocketNetty.(){color}
{color:#de350b} at java.lang.Class.getConstructor0(Class.java:3082){color}
{color:#de350b} at 
java.lang.Class.getDeclaredConstructor(Class.java:2178){color}
{color:#de350b} at 
org.apache.zookeeper.ZooKeeper.getClientCnxnSocket(ZooKeeper.java:1848){color}
{color:#de350b} ... 10 more{color}
{color:#de350b}[2019-10-30 03:52:10,039] INFO shutting down 
(kafka.server.KafkaServer){color}
{color:#de350b}[2019-10-30 03:52:10,046] INFO shut down completed 
(kafka.server.KafkaServer){color}
{color:#de350b}[2019-10-30 03:52:10,046] ERROR Exiting Kafka. 
(kafka.server.KafkaServerStartable){color}
{color:#de350b}[2019-10-30 03:52:10,048] INFO shutting down 
(kafka.server.KafkaServer){color}

STEPS.

1)  I copied following zookeeper dependencies into kafka bin. 

a) kafka 2.3.0 and zookeer 3.5.5

"zookeeper-3.5.6.jar" "zookeeper-jute-3.5.6.jar" "netty*.jar" 
"commons-cli-1.2.jar"

b) kafka 2.3.1 and zookeer 3.5.6

"zookeeper-3.5.6.jar" "zookeeper-jute-3.5.6.jar" 
"netty-buffer-4.1.42.Final.jar" "netty-buffer-4.1.42.Final.LICENSE.txt" 
"netty-codec-4.1.42.Final.jar" "netty-codec-4.1.42.Final.LICENSE.txt" 
"netty-common-4.1.42.Final.jar" "netty-common-4.1.42.Final.LICENSE.txt" 
"netty-handler-4.1.42.Final.jar" "netty-handler-4.1.42.Final.LICENSE.txt" 
"netty-resolver-4.1.42.Final.jar" "netty-resolver-4.1.42.Final.LICENSE.txt" 
"netty-transport-4.1.42.Final.jar" "netty-transport-4.1.42.Final.LICENSE.txt" 
"netty-transport-native-epoll-4.1.42.Final.jar" 
"netty-transport-native-epoll-4.1.42.Final.LICENSE.txt" 
"netty-transport-native-unix-common-4.1.42.Final.jar" 
"netty-transport-native-unix-common-4.1.42.Final.LICENSE.txt" 
"commons-cli-1.2.jar"

*2) Configurations:* 

The *zookeeper* cluster looks good with

1) configuration *zoo.conf*. 

{color:#505f79}quorum.auth.server.loginContext=QuorumServer{color}
{color:#505f79}quorum.auth.learner.loginContext=QuorumLearner{color}
{color:#505f79}syncLimit=2{color}
{color:#505f79}tickTime=2000{color}
{color:#505f79}server.3=broker1\:2888\:3888{color}
{color:#505f79}server.2=broker2\:2888\:3888{color}
{color:#505f79}server.1=broker3\:2888\:3888{color}
{color:#505f79}authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider{color}
{color:#505f79}initLimit=10{color}
{color:#505f79}secureClientPort=2281{color}
{color:#505f79}quorum.auth.learnerRequireSasl=true{color}
{color:#505f79}quorum.auth.enableSasl=true{color}
{color:#505f79}quorum.auth.kerberos.servicePrincipal=servicename/_HOST{color}
{color:#505f79}quorum.cnxn.threads.size=20{color}
{color:#505f79}zookeeper.client.secure=true{color}
{color:#505f79}quorum.auth.serverRequireSasl=true{color}
{color:#505f79}zookeeper.serverCnxnFactory=org.apache.zookeeper.ClientCnxnSocketNetty{color}
{color:#505f79}dataDir=../data/zookeeper/data/{color}

2) with *SERVER_JVMFLAGS* set to  

-Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
 
-Dzookeeper.ssl.client.auth=none 

[jira] [Resolved] (KAFKA-8972) KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state

2019-10-29 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-8972.

Resolution: Fixed

Nice work guys!

> KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback 
> state
> 
>
> Key: KAFKA-8972
> URL: https://issues.apache.org/jira/browse/KAFKA-8972
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Blocker
> Fix For: 2.4.0
>
>
> Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the 
> following:
> {code}
> this.subscriptions.unsubscribe();
> this.coordinator.onLeavePrepare();
> this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
> {code}
> And inside {{onLeavePrepare}} we would look into the assignment and try to 
> revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, 
> and then clear the assignment.
> However, the subscription's assignment is already cleared in 
> {{this.subscriptions.unsubscribe();}} which means user's rebalance listener 
> would never be triggered. In other words, from consumer client's pov nothing 
> is owned after unsubscribe, but from the user caller's pov the partitions are 
> not revoked yet. For callers like Kafka Streams which rely on the rebalance 
> listener to maintain their internal state, this leads to inconsistent state 
> management and failure cases.
> Before KIP-429 this issue is hidden away since every time the consumer 
> re-joins the group later, it would still revoke everything anyways regardless 
> of the passed-in parameters of the rebalance listener; with KIP-429 this is 
> easier to reproduce now.
> I think we can summarize our fix as:
> • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then 
> `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks 
> are all closed as revoked by then.
> • [Optimization] If the generation is reset due to fatal error from join / hb 
> response etc, then we know that all partitions are lost, and we should not 
> trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside 
> `onLeavePrepare`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-10-29 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962678#comment-16962678
 ] 

ASF GitHub Bot commented on KAFKA-8677:
---

guozhangwang commented on pull request #7613: KAFKA-8677: Simplify the 
best-effort network client poll to never throw exception
URL: https://github.com/apache/kafka/pull/7613
 
 
   Within KafkaConsumer.poll, we have an optimization to try to send the next 
fetch request before returning the data in order to pipelining the fetch 
requests; however, this pollNoWakeup should NOT throw any exceptions, since at 
this point the fetch position has been updated. If an exception is thrown here, 
and the callers decides to capture and continue, those records would never be 
returned again, causing data loss.
   
   Also fix the flaky test itself.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flakey test 
> GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> 
>
> Key: KAFKA-8677
> URL: https://issues.apache.org/jira/browse/KAFKA-8677
> Project: Kafka
>  Issue Type: Bug
>  Components: core, security, unit tests
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Anastasia Vela
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]
>  
> *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* 
> kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
>  *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* 
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 1 records
> ---
> I found this flaky test is actually exposing a real bug in consumer: within 
> {{KafkaConsumer.poll}}, we have an optimization to try to send the next fetch 
> request before returning the data in order to pipelining the fetch requests:
> {code}
> if (!records.isEmpty()) {
> // before returning the fetched records, we can send off 
> the next round of fetches
> // and avoid block waiting for their responses to enable 
> pipelining while the user
> // is handling the fetched records.
> //
> // NOTE: since the consumed position has already been 
> updated, we must not allow
> // wakeups or any other errors to be triggered prior to 
> returning the fetched records.
> if (fetcher.sendFetches() > 0 || 
> client.hasPendingRequests()) {
> client.pollNoWakeup();
> }
> return this.interceptors.onConsume(new 
> ConsumerRecords<>(records));
> }
> {code}
> As the NOTE mentioned, this pollNoWakeup should NOT throw any exceptions, 
> since at this point the fetch position has been updated. If an exception is 
> thrown here, and the callers decides to capture and continue, those records 
> would never be returned again, causing data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8972) KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state

2019-10-29 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962673#comment-16962673
 ] 

ASF GitHub Bot commented on KAFKA-8972:
---

guozhangwang commented on pull request #7608: KAFKA-8972 (2.4 blocker): clear 
all state for zombie task on TaskMigratedException
URL: https://github.com/apache/kafka/pull/7608
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback 
> state
> 
>
> Key: KAFKA-8972
> URL: https://issues.apache.org/jira/browse/KAFKA-8972
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Blocker
> Fix For: 2.4.0
>
>
> Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the 
> following:
> {code}
> this.subscriptions.unsubscribe();
> this.coordinator.onLeavePrepare();
> this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
> {code}
> And inside {{onLeavePrepare}} we would look into the assignment and try to 
> revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, 
> and then clear the assignment.
> However, the subscription's assignment is already cleared in 
> {{this.subscriptions.unsubscribe();}} which means user's rebalance listener 
> would never be triggered. In other words, from consumer client's pov nothing 
> is owned after unsubscribe, but from the user caller's pov the partitions are 
> not revoked yet. For callers like Kafka Streams which rely on the rebalance 
> listener to maintain their internal state, this leads to inconsistent state 
> management and failure cases.
> Before KIP-429 this issue is hidden away since every time the consumer 
> re-joins the group later, it would still revoke everything anyways regardless 
> of the passed-in parameters of the rebalance listener; with KIP-429 this is 
> easier to reproduce now.
> I think we can summarize our fix as:
> • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then 
> `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks 
> are all closed as revoked by then.
> • [Optimization] If the generation is reset due to fatal error from join / hb 
> response etc, then we know that all partitions are lost, and we should not 
> trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside 
> `onLeavePrepare`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8939) Flaky test ReassignPartitionsClusterTest#shouldTriggerReassignmentWithZnodePrecedenceOnControllerStartup

2019-10-29 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962668#comment-16962668
 ] 

Sophie Blee-Goldman commented on KAFKA-8939:


h3. Stacktrace

java.lang.AssertionError: expected: but was: at 
org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.failNotEquals(Assert.java:835) at 
org.junit.Assert.assertEquals(Assert.java:120) at 
org.junit.Assert.assertEquals(Assert.java:146)

> Flaky test 
> ReassignPartitionsClusterTest#shouldTriggerReassignmentWithZnodePrecedenceOnControllerStartup
> 
>
> Key: KAFKA-8939
> URL: https://issues.apache.org/jira/browse/KAFKA-8939
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core
>Affects Versions: 2.4.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> h3. Stacktrace
> java.lang.AssertionError: expected: but was: at 
> org.junit.Assert.fail(Assert.java:89) at 
> org.junit.Assert.failNotEquals(Assert.java:835) at 
> org.junit.Assert.assertEquals(Assert.java:120) at 
> org.junit.Assert.assertEquals(Assert.java:146) at 
> kafka.admin.ReassignPartitionsClusterTest.shouldTriggerReassignmentWithZnodePrecedenceOnControllerStartup(ReassignPartitionsClusterTest.scala:687)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at 
> org.junit.runners.ParentRunner.run(ParentRunner.java:412) at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>  at jdk.internal.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>  at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>  at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>  at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>  at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>  at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at 
> 

[jira] [Updated] (KAFKA-9113) Clean up task management

2019-10-29 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9113:
---
Component/s: streams

> Clean up task management
> 
>
> Key: KAFKA-9113
> URL: https://issues.apache.org/jira/browse/KAFKA-9113
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Along KIP-429 we did a lot of refactoring of the task management classes, 
> including the TaskManager and AssignedTasks (and children).  While hopefully 
> easier to reason about there's still significant opportunity for further 
> cleanup including safer state tracking.  Some potential improvements:
> 1) Verify that no tasks are ever in more than one state at once. One 
> possibility is to just check that the suspended, created, restoring, and 
> running maps are all disjoint, but this begs the question of when and where 
> to do those checks, and how often. Another idea might be to put all tasks 
> into a single map and just track their state on a per-task basis. Whatever it 
> is should be aware that some methods are on the critical code path, and 
> should not be burdened with excessive safety checks (ie 
> AssignedStreamTasks#process)
> 2) Cleanup of closing and/or shutdown logic – there are some potential 
> improvements to be made here as well, for example AssignedTasks currently 
> implements a closeZombieTask method despite the fact that standby tasks are 
> never zombies. 
> 3)  The StoreChangelogReader also interacts with (only) the 
> AssignedStreamsTasks class, through the TaskManager. It can be difficult to 
> reason about these interactions and the state of the changelog reader.
> 4) All 4 classes and their state have very strict consistency requirements 
> that currently are almost impossible to verify, which has already resulted in 
> several bugs that we were lucky to catch in time. We should tighten up how 
> these classes manage their own state, and how the overall state is managed 
> between them, so that it is easy to make changes without introducing new bugs 
> because one class updated its own state without knowing it needed to tell 
> another class to also update its



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9112) Combine streams `onAssignment` with `partitionsAssigned` task creation

2019-10-29 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962667#comment-16962667
 ] 

Sophie Blee-Goldman commented on KAFKA-9112:


The thoughts above are obviously way beyond the scope of this ticket, though 
related. Ideally in a similar way we could also fix up subscriptionUserdata so 
that it could just blindly encode the userdata, but given the callback 
inconsistencies noted above all we can really do for now is consolidate the 
work in onAssignment and onPartitionsAssigned, which alone is guaranteed to be 
called at all, and always at the same point of a rebalance (the end)

> Combine streams `onAssignment` with `partitionsAssigned` task creation
> --
>
> Key: KAFKA-9112
> URL: https://issues.apache.org/jira/browse/KAFKA-9112
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> Task manager needs to call `createTasks` inside partitionsAssigned callback, 
> which is after the `onAssignment` callback for assignor. This means during 
> task creation we rely on the status change based on the intermediate data 
> structures populated by a different callback, which is hard to reason about. 
> We should consider consolidate logics to either one of the callbacks, prefer 
> `onAssignment` as it contains full information needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9112) Combine streams `onAssignment` with `partitionsAssigned` task creation

2019-10-29 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962498#comment-16962498
 ] 

Sophie Blee-Goldman edited comment on KAFKA-9112 at 10/30/19 3:59 AM:
--

I agree with this ticket 100% and had the same thought a million times while 
working on KIP-429. I would personally vote to actually combine the 
ConsumerRebalanceListener and ConsumerPartitionAssignor in some way, or at 
least give a number of the assignor's current responsibilities over to the 
listener. In complex systems they need to share state and synchronize, which is 
made worse by the different listener callback semantics of the two rebalance 
protocols. 

eg in eager, onPartitionsRevoked is always called before 
ConsumerPartitionAssignor#subscriptionUserdata and therefore the assignor can 
count on whatever state being "updated" as of the new rebalance – but in a 
cooperative rebalance, ConsumerPartitionAssignor#subscriptionUserdata might be 
the first and only method called at the start of the rebalance (but it also 
might not be!). It's awkward that subscriptionUserdata has to reason about how 
the state may or may not have been updated, according to which protocol – when 
its job should just be to wrap up some data and encode it.

The assignor's current role extends way beyond just assignment, and is just as 
responsible for updating state and reacting to rebalances as the listener. We 
should try and define a clear separation of responsibilities – maybe one class 
that reacts to the occurrence of a rebalance – regardless of protocol, ie 
through onRebalanceBegin and onRebalanceEnd for example – plus the usual 
callbacks to react to the specific partition-ownership events (eg 
onPartitionsRevoked). This one _reacts_ to a rebalance, keeps state, and tells 
the user/app (eg Streams) how to then update its own state and react 
accordingly – this is a true rebalance listener

And another class should be responsible for the actual assignment, such as 
which assignment strategy to support and encode, and of course implementing the 
actual assignment algorithm. This one should most likely be almost, if not 
entirely, stateful – any state it needs can come from the listener, but should 
not be stored or updated by this class. It doesn't react to events, it performs 
functions – assignment, or encoding the subscription, etc. – it's a pure 
assignor


was (Author: ableegoldman):
I agree with this ticket 100% and had the same thought a million times while 
working on KIP-429. I would personally vote to combine the 
ConsumerRebalanceListener and ConsumerPartitionAssignor completely (or at least 
give a number of the assignor's current responsibilities over to the listener). 
In complex systems they need to share state and synchronize, which is made 
worse by the different listener callback semantics of the two rebalance 
protocols. 

eg in eager, onPartitionsRevoked is always called before 
ConsumerPartitionAssignor#subscriptionUserdata and therefore the assignor can 
count on whatever state being "updated" as of the new rebalance – but in a 
cooperative rebalance, ConsumerPartitionAssignor#subscriptionUserdata might be 
the first and only method called at the start of the rebalance (but it also 
might not be!)

The assignor's current role extends way beyond just assignment, and is just as 
responsible for updating state and reacting . to rebalances as the listener. We 
should try and define a clear separation of responsibilities – maybe one class 
that reacts to the occurrence of a rebalance – regardless of protocol, ie 
through onRebalanceBegin and onRebalanceEnd for example – plus the usual 
callbacks to react to the specific partition-ownership events (eg 
onPartitionsRevoked). This one _reacts_ to a rebalance, keeps state, and tells 
the user/app (eg Streams) how to then update its own state and react 
accordingly – this is a true rebalance listener

And another class should be responsible for the actual assignment, such as 
which assignment strategy to support and encode, and of course implementing the 
actual assignment algorithm. This one should most likely be almost, if not 
entirely, stateful – any state it needs can come from the listener, but should 
not be stored or updated by this class. It doesn't react to events, it performs 
functions – assignment, or encoding the subscription, etc. – it's a pure 
assignor

> Combine streams `onAssignment` with `partitionsAssigned` task creation
> --
>
> Key: KAFKA-9112
> URL: https://issues.apache.org/jira/browse/KAFKA-9112
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> Task manager needs to call `createTasks` inside partitionsAssigned callback, 

[jira] [Updated] (KAFKA-9112) Combine streams `onAssignment` with `partitionsAssigned` task creation

2019-10-29 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9112:
---
Component/s: (was: consumer)

> Combine streams `onAssignment` with `partitionsAssigned` task creation
> --
>
> Key: KAFKA-9112
> URL: https://issues.apache.org/jira/browse/KAFKA-9112
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> Task manager needs to call `createTasks` inside partitionsAssigned callback, 
> which is after the `onAssignment` callback for assignor. This means during 
> task creation we rely on the status change based on the intermediate data 
> structures populated by a different callback, which is hard to reason about. 
> We should consider consolidate logics to either one of the callbacks, prefer 
> `onAssignment` as it contains full information needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9112) Combine streams `onAssignment` with `partitionsAssigned` task creation

2019-10-29 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9112:
---
Component/s: consumer

> Combine streams `onAssignment` with `partitionsAssigned` task creation
> --
>
> Key: KAFKA-9112
> URL: https://issues.apache.org/jira/browse/KAFKA-9112
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Priority: Major
>
> Task manager needs to call `createTasks` inside partitionsAssigned callback, 
> which is after the `onAssignment` callback for assignor. This means during 
> task creation we rely on the status change based on the intermediate data 
> structures populated by a different callback, which is hard to reason about. 
> We should consider consolidate logics to either one of the callbacks, prefer 
> `onAssignment` as it contains full information needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9025) ZkSecurityMigrator not working with zookeeper chroot

2019-10-29 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962659#comment-16962659
 ] 

huxihx commented on KAFKA-9025:
---

[~lmairbus] You should explicitly specify the chroot when running 
zookeeper-security-migration.sh if a chroot is configured, as shown below:
{code:java}
bin/zookeeper-security-migration.sh --zookeeper.acl=secure 
--zookeeper.connect=localhost:2181/kafka{code}
Could you retry your scenario with this command above?

> ZkSecurityMigrator not working with zookeeper chroot
> 
>
> Key: KAFKA-9025
> URL: https://issues.apache.org/jira/browse/KAFKA-9025
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.3.0
> Environment: Reproduced at least on rhel and macos
>Reporter: Laurent Millet
>Priority: Major
>
> The ZkSecurityMigrator tool fails to handle installations where kafka is 
> configured with a zookeeper chroot (as opposed to using /, the default):
>  * ACLs on existing nodes are not modified (they are left world-modifiable)
>  * New nodes created by the tool are created directly under the zookeeper 
> root instead of under the chroot
> The tool does not emit any message, thus the unsuspecting user can only 
> assume everything went well, when in fact it did not and znodes are still not 
> secure:
> kafka_2.12-2.3.0 $ bin/zookeeper-security-migration.sh --zookeeper.acl=secure 
> --zookeeper.connect=localhost:2181
> kafka_2.12-2.3.0 $
> For example, with kafka configured to use /kafka as chroot 
> (zookeeper.connect=localhost:2181/kafka), the following is observed:
>  * Before running the tool
>  ** Zookeeper top-level nodes (all kafka nodes are under /kafka):
> [zk: localhost:2181(CONNECTED) 1] ls /
> [kafka, zookeeper]
>  ** Example node ACL:
> [zk: localhost:2181(CONNECTED) 2] getAcl /kafka/brokers
> 'world,'anyone
> : cdrwa
>  * After running the tool:
>  ** Zookeeper top-level nodes (kafka nodes created by the tool appeared here):
> [zk: localhost:2181(CONNECTED) 3] ls /
> [admin, brokers, cluster, config, controller, controller_epoch, 
> delegation_token, isr_change_notification, kafka, kafka-acl, 
> kafka-acl-changes, kafka-acl-extended, kafka-acl-extended-changes, 
> latest_producer_id_block, log_dir_event_notification, zookeeper]
>  ** Example node ACL:
> [zk: localhost:2181(CONNECTED) 4] getAcl /kafka/brokers
> 'world,'anyone
> : cdrwa
>  ** New node ACL:
> [zk: localhost:2181(CONNECTED) 5] getAcl /brokers
> 'sasl,'kafka
> : cdrwa
> 'world,'anyone
> : r
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-10-29 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-8677:
-
Description: 
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]

 
*18:43:39* kafka.api.GroupEndToEndAuthorizationTest > 
testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* 
kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
 *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > 
testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* 
org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout 
instead of the expected 1 records

---

I found this flaky test is actually exposing a real bug in consumer: within 
{{KafkaConsumer.poll}}, we have an optimization to try to send the next fetch 
request before returning the data in order to pipelining the fetch requests:

{code}
if (!records.isEmpty()) {
// before returning the fetched records, we can send off 
the next round of fetches
// and avoid block waiting for their responses to enable 
pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been 
updated, we must not allow
// wakeups or any other errors to be triggered prior to 
returning the fetched records.
if (fetcher.sendFetches() > 0 || 
client.hasPendingRequests()) {
client.pollNoWakeup();
}

return this.interceptors.onConsume(new 
ConsumerRecords<>(records));
}
{code}

As the NOTE mentioned, this pollNoWakeup should NOT throw any exceptions, since 
at this point the fetch position has been updated. If an exception is thrown 
here, and the callers decides to capture and continue, those records would 
never be returned again, causing data loss.

  was:
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]

 
*18:43:39* kafka.api.GroupEndToEndAuthorizationTest > 
testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* 
kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
 *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > 
testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* 
org.scalatest.exceptions.TestFailedException: Consumed 0 records before timeout 
instead of the expected 1 records


> Flakey test 
> GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> 
>
> Key: KAFKA-8677
> URL: https://issues.apache.org/jira/browse/KAFKA-8677
> Project: Kafka
>  Issue Type: Bug
>  Components: core, security, unit tests
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Anastasia Vela
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]
>  
> *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* 
> kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
>  *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* 
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 1 records
> ---
> I found this flaky test is actually exposing a real bug in consumer: within 
> {{KafkaConsumer.poll}}, we have an optimization to try to send the next fetch 
> request before returning the data in order to pipelining the fetch requests:
> {code}
> if (!records.isEmpty()) {
> // before returning the fetched records, we can send off 
> the next round of fetches
> // and avoid block 

[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2019-10-29 Thread Ashish Surana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962541#comment-16962541
 ] 

Ashish Surana commented on KAFKA-6555:
--

[~vinoth] I worked on it earlier like an year back and since then haven't 
looked at it. I can share if it's of any use to you.

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9040) Implement --all option for describing configs

2019-10-29 Thread Raymond Ng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raymond Ng reassigned KAFKA-9040:
-

Assignee: Raymond Ng  (was: Jason Gustafson)

> Implement --all option for describing configs
> -
>
> Key: KAFKA-9040
> URL: https://issues.apache.org/jira/browse/KAFKA-9040
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Raymond Ng
>Priority: Major
>
> Add an --all option to list all configs (not just the dyanmic configs) using 
> the config tool. More details here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-524%3A+Allow+users+to+choose+config+source+when+describing+configs].
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9112) Combine streams `onAssignment` with `partitionsAssigned` task creation

2019-10-29 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9112:
---
Component/s: streams

> Combine streams `onAssignment` with `partitionsAssigned` task creation
> --
>
> Key: KAFKA-9112
> URL: https://issues.apache.org/jira/browse/KAFKA-9112
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> Task manager needs to call `createTasks` inside partitionsAssigned callback, 
> which is after the `onAssignment` callback for assignor. This means during 
> task creation we rely on the status change based on the intermediate data 
> structures populated by a different callback, which is hard to reason about. 
> We should consider consolidate logics to either one of the callbacks, prefer 
> `onAssignment` as it contains full information needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9113) Clean up task management

2019-10-29 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-9113:
--

 Summary: Clean up task management
 Key: KAFKA-9113
 URL: https://issues.apache.org/jira/browse/KAFKA-9113
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.4.0
Reporter: Sophie Blee-Goldman


Along KIP-429 we did a lot of refactoring of the task management classes, 
including the TaskManager and AssignedTasks (and children).  While hopefully 
easier to reason about there's still significant opportunity for further 
cleanup including safer state tracking.  Some potential improvements:

1) Verify that no tasks are ever in more than one state at once. One 
possibility is to just check that the suspended, created, restoring, and 
running maps are all disjoint, but this begs the question of when and where to 
do those checks, and how often. Another idea might be to put all tasks into a 
single map and just track their state on a per-task basis. Whatever it is 
should be aware that some methods are on the critical code path, and should not 
be burdened with excessive safety checks (ie AssignedStreamTasks#process)

2) Cleanup of closing and/or shutdown logic – there are some potential 
improvements to be made here as well, for example AssignedTasks currently 
implements a closeZombieTask method despite the fact that standby tasks are 
never zombies. 

3)  The StoreChangelogReader also interacts with (only) the 
AssignedStreamsTasks class, through the TaskManager. It can be difficult to 
reason about these interactions and the state of the changelog reader.

4) All 4 classes and their state have very strict consistency requirements that 
currently are almost impossible to verify, which has already resulted in 
several bugs that we were lucky to catch in time. We should tighten up how 
these classes manage their own state, and how the overall state is managed 
between them, so that it is easy to make changes without introducing new bugs 
because one class updated its own state without knowing it needed to tell 
another class to also update its



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9112) Combine streams `onAssignment` with `partitionsAssigned` task creation

2019-10-29 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962498#comment-16962498
 ] 

Sophie Blee-Goldman commented on KAFKA-9112:


I agree with this ticket 100% and had the same thought a million times while 
working on KIP-429. I would personally vote to combine the 
ConsumerRebalanceListener and ConsumerPartitionAssignor completely (or at least 
give a number of the assignor's current responsibilities over to the listener). 
In complex systems they need to share state and synchronize, which is made 
worse by the different listener callback semantics of the two rebalance 
protocols. 

eg in eager, onPartitionsRevoked is always called before 
ConsumerPartitionAssignor#subscriptionUserdata and therefore the assignor can 
count on whatever state being "updated" as of the new rebalance – but in a 
cooperative rebalance, ConsumerPartitionAssignor#subscriptionUserdata might be 
the first and only method called at the start of the rebalance (but it also 
might not be!)

The assignor's current role extends way beyond just assignment, and is just as 
responsible for updating state and reacting . to rebalances as the listener. We 
should try and define a clear separation of responsibilities – maybe one class 
that reacts to the occurrence of a rebalance – regardless of protocol, ie 
through onRebalanceBegin and onRebalanceEnd for example – plus the usual 
callbacks to react to the specific partition-ownership events (eg 
onPartitionsRevoked). This one _reacts_ to a rebalance, keeps state, and tells 
the user/app (eg Streams) how to then update its own state and react 
accordingly – this is a true rebalance listener

And another class should be responsible for the actual assignment, such as 
which assignment strategy to support and encode, and of course implementing the 
actual assignment algorithm. This one should most likely be almost, if not 
entirely, stateful – any state it needs can come from the listener, but should 
not be stored or updated by this class. It doesn't react to events, it performs 
functions – assignment, or encoding the subscription, etc. – it's a pure 
assignor

> Combine streams `onAssignment` with `partitionsAssigned` task creation
> --
>
> Key: KAFKA-9112
> URL: https://issues.apache.org/jira/browse/KAFKA-9112
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Major
>
> Task manager needs to call `createTasks` inside partitionsAssigned callback, 
> which is after the `onAssignment` callback for assignor. This means during 
> task creation we rely on the status change based on the intermediate data 
> structures populated by a different callback, which is hard to reason about. 
> We should consider consolidate logics to either one of the callbacks, prefer 
> `onAssignment` as it contains full information needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9112) Combine streams `onAssignment` with `partitionsAssigned` task creation

2019-10-29 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9112:
--

 Summary: Combine streams `onAssignment` with `partitionsAssigned` 
task creation
 Key: KAFKA-9112
 URL: https://issues.apache.org/jira/browse/KAFKA-9112
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


Task manager needs to call `createTasks` inside partitionsAssigned callback, 
which is after the `onAssignment` callback for assignor. This means during task 
creation we rely on the status change based on the intermediate data structures 
populated by a different callback, which is hard to reason about. We should 
consider consolidate logics to either one of the callbacks, prefer 
`onAssignment` as it contains full information needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-10-29 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962448#comment-16962448
 ] 

Bruno Cadonna commented on KAFKA-8630:
--

Agreed

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
> at 
> com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 

[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-10-29 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962413#comment-16962413
 ] 

Sophie Blee-Goldman commented on KAFKA-8630:


That's fair, I agree that would be the ideal resolution to this ticket and 
something we should do anyways. That said I've been thinking lately this ticket 
is actually pretty important as without it users basically have no legitimate 
way of unit testing a stateful transformer, processor, etc. 

But if we aren't able to fix this issue holistically (ie resolve  KAFKA-9109) 
by the next release cycle, I would vote to implement the quick and dirty 
solution I outlined in KAFKA-9088 just to give users something.

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> 

[jira] [Assigned] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-10-29 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-8630:
--

Assignee: Sophie Blee-Goldman

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
> at 
> com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> 

[jira] [Assigned] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-10-29 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-8630:
--

Assignee: (was: Sophie Blee-Goldman)

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:90)
> at 
> com.cantgetthistowork.InMemWindowProcessorTest.setup(InMemWindowProcessorTest.java:36)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> 

[jira] [Resolved] (KAFKA-9077) System Test Failure: StreamsSimpleBenchmarkTest

2019-10-29 Thread Bill Bejeck (Jira)


[jira] [Commented] (KAFKA-9077) System Test Failure: StreamsSimpleBenchmarkTest

2019-10-29 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962367#comment-16962367
 ] 

ASF GitHub Bot commented on KAFKA-9077:
---

bbejeck commented on pull request #7610: KAFKA-9077: Fix reading of metrics of 
Streams' SimpleBenchmark
URL: https://github.com/apache/kafka/pull/7610
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> System Test Failure: StreamsSimpleBenchmarkTest
> ---
>
> Key: KAFKA-9077
> URL: https://issues.apache.org/jira/browse/KAFKA-9077
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Affects Versions: 2.4.0
>Reporter: Manikumar
>Assignee: Bruno Cadonna
>Priority: Minor
> Fix For: 2.5.0
>
>
> StreamsSimpleBenchmarkTest tests are failing on 2.4 and trunk.
> http://confluent-kafka-2-4-system-test-results.s3-us-west-2.amazonaws.com/2019-10-21--001.1571716233--confluentinc--2.4--cb4944f/report.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext

2019-10-29 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962360#comment-16962360
 ] 

Sophie Blee-Goldman commented on KAFKA-9088:


Whoops, thanks for pointing that out. Went back and fixed the link in my 
comment. But to answer your other question, by  "new 
MockInternalProcessorContext" I do mean whatever we end up with once this 
ticket is resolved. Not sure which version of the name we want to keep going 
forward (personally I think MockInternalProcessorContext makes more sense, but 
I don't have a strong preference)

> Consolidate InternalMockProcessorContext and MockInternalProcessorContext
> -
>
> Key: KAFKA-9088
> URL: https://issues.apache.org/jira/browse/KAFKA-9088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Bruno Cadonna
>Priority: Minor
>  Labels: newbie
>
> Currently, we have two mocks for the {{InternalProcessorContext}}. The goal 
> of this ticket is to merge both into one mock or replace it with an 
> {{EasyMock}} mock. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext

2019-10-29 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16961475#comment-16961475
 ] 

Sophie Blee-Goldman edited comment on KAFKA-9088 at 10/29/19 7:55 PM:
--

[~mjsax] Here's a thought: what if we moved the new 
MockInternalProcessorContext to the test-utils package (internal, not public)? 
That could help us tackle this issue with unit testing window stores (KAFKA- 
[7594|https://github.com/apache/kafka/pull/7594]), as we could provide builder 
methods that return an initialized store using the internal processor context, 
so that users don't have to rely on internal objects in their own code in order 
to get a useable window store 


was (Author: ableegoldman):
[~mjsax] Here's a thought: what if we moved the new 
MockInternalProcessorContext to the test-utils package (internal, not public)? 
That could help us tackle this issue with unit testing window stores 
(KAFKA-9088), as we could provide builder methods that return an initialized 
store using the internal processor context, so that users don't have to rely on 
internal objects in their own code in order to get a useable window store 

> Consolidate InternalMockProcessorContext and MockInternalProcessorContext
> -
>
> Key: KAFKA-9088
> URL: https://issues.apache.org/jira/browse/KAFKA-9088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Bruno Cadonna
>Priority: Minor
>  Labels: newbie
>
> Currently, we have two mocks for the {{InternalProcessorContext}}. The goal 
> of this ticket is to merge both into one mock or replace it with an 
> {{EasyMock}} mock. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9069) Flaky Test AdminClientIntegrationTest.testCreatePartitions

2019-10-29 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962318#comment-16962318
 ] 

Bruno Cadonna commented on KAFKA-9069:
--

https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/9024/testReport/junit/kafka.api/SslAdminClientIntegrationTest/testCreatePartitions/

> Flaky Test AdminClientIntegrationTest.testCreatePartitions
> --
>
> Key: KAFKA-9069
> URL: https://issues.apache.org/jira/browse/KAFKA-9069
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/2792/testReport/junit/kafka.api/AdminClientIntegrationTest/testCreatePartitions/]
> {quote}java.lang.AssertionError: validateOnly expected:<3> but was:<1> at 
> org.junit.Assert.fail(Assert.java:89) at 
> org.junit.Assert.failNotEquals(Assert.java:835) at 
> org.junit.Assert.assertEquals(Assert.java:647) at 
> kafka.api.AdminClientIntegrationTest.$anonfun$testCreatePartitions$5(AdminClientIntegrationTest.scala:651)
>  at 
> kafka.api.AdminClientIntegrationTest.$anonfun$testCreatePartitions$5$adapted(AdminClientIntegrationTest.scala:601)
>  at scala.collection.immutable.List.foreach(List.scala:305) at 
> kafka.api.AdminClientIntegrationTest.testCreatePartitions(AdminClientIntegrationTest.scala:601){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9077) System Test Failure: StreamsSimpleBenchmarkTest

2019-10-29 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962293#comment-16962293
 ] 

ASF GitHub Bot commented on KAFKA-9077:
---

cadonna commented on pull request #7610: KAFKA-9077: Fix reading of metrics of 
Streams' SimpleBenchmark
URL: https://github.com/apache/kafka/pull/7610
 
 
   With KIP-444 the metrics definitions are refactored. Thus, Streams' 
SimpleBenchmark needs to be updated to correctly access the refactored metrics.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> System Test Failure: StreamsSimpleBenchmarkTest
> ---
>
> Key: KAFKA-9077
> URL: https://issues.apache.org/jira/browse/KAFKA-9077
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Affects Versions: 2.4.0
>Reporter: Manikumar
>Assignee: Bruno Cadonna
>Priority: Minor
> Fix For: 2.5.0
>
>
> StreamsSimpleBenchmarkTest tests are failing on 2.4 and trunk.
> http://confluent-kafka-2-4-system-test-results.s3-us-west-2.amazonaws.com/2019-10-21--001.1571716233--confluentinc--2.4--cb4944f/report.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8972) KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback state

2019-10-29 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962265#comment-16962265
 ] 

ASF GitHub Bot commented on KAFKA-8972:
---

guozhangwang commented on pull request #7441: KAFKA-8972 (2.4 blocker): 
correctly release lost partitions during consumer.unsubscribe()
URL: https://github.com/apache/kafka/pull/7441
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer.unsubscribe could leave inconsistent user rebalance callback 
> state
> 
>
> Key: KAFKA-8972
> URL: https://issues.apache.org/jira/browse/KAFKA-8972
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Blocker
> Fix For: 2.4.0
>
>
> Our current implementation ordering of {{KafkaConsumer.unsubscribe}} is the 
> following:
> {code}
> this.subscriptions.unsubscribe();
> this.coordinator.onLeavePrepare();
> this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
> {code}
> And inside {{onLeavePrepare}} we would look into the assignment and try to 
> revoke them and notify users via {{RebalanceListener#onPartitionsRevoked}}, 
> and then clear the assignment.
> However, the subscription's assignment is already cleared in 
> {{this.subscriptions.unsubscribe();}} which means user's rebalance listener 
> would never be triggered. In other words, from consumer client's pov nothing 
> is owned after unsubscribe, but from the user caller's pov the partitions are 
> not revoked yet. For callers like Kafka Streams which rely on the rebalance 
> listener to maintain their internal state, this leads to inconsistent state 
> management and failure cases.
> Before KIP-429 this issue is hidden away since every time the consumer 
> re-joins the group later, it would still revoke everything anyways regardless 
> of the passed-in parameters of the rebalance listener; with KIP-429 this is 
> easier to reproduce now.
> I think we can summarize our fix as:
> • Inside `unsubscribe`, first do `onLeavePrepare / maybeLeaveGroup` and then 
> `subscription.unsubscribe`. This we we are guaranteed that the streams' tasks 
> are all closed as revoked by then.
> • [Optimization] If the generation is reset due to fatal error from join / hb 
> response etc, then we know that all partitions are lost, and we should not 
> trigger `onPartitionRevoked`, but instead just `onPartitionsLost` inside 
> `onLeavePrepare`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-10-29 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962249#comment-16962249
 ] 

Matthias J. Sax commented on KAFKA-8677:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/3017/testReport/junit/kafka.api/SaslPlainSslEndToEndAuthorizationTest/testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl/]

> Flakey test 
> GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> 
>
> Key: KAFKA-8677
> URL: https://issues.apache.org/jira/browse/KAFKA-8677
> Project: Kafka
>  Issue Type: Bug
>  Components: core, security, unit tests
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Assignee: Anastasia Vela
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console]
>  
> *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* 
> kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00*
>  *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > 
> testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* 
> org.scalatest.exceptions.TestFailedException: Consumed 0 records before 
> timeout instead of the expected 1 records



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9059) Implement ReassignmentMaxLag

2019-10-29 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962152#comment-16962152
 ] 

ASF GitHub Bot commented on KAFKA-9059:
---

viktorsomogyi commented on pull request #7609: KAFKA-9059: Implement 
ReassignmentMaxLag
URL: https://github.com/apache/kafka/pull/7609
 
 
   This PR implements the 
kafka.server:type=ReplicaManager,name=ReassignmentMaxLag,clientId=Leader metric 
by keeping track of the reassignment lag in the follower.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement ReassignmentMaxLag
> 
>
> Key: KAFKA-9059
> URL: https://issues.apache.org/jira/browse/KAFKA-9059
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.5.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> Some more thinking is required to the implementation of MaxReassignmentLag 
> proposed by 
> [KIP-352|https://cwiki.apache.org/confluence/display/KAFKA/KIP-352:+Distinguish+URPs+caused+by+reassignment]
>  as it's not so straightforward (need to maintain partitions' lag), therefore 
> we separated it off into this JIRA.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9111) Incorrect project category in DOAP file, breaking projects.apache.org category listing

2019-10-29 Thread Nick Burch (Jira)
Nick Burch created KAFKA-9111:
-

 Summary: Incorrect project category in DOAP file, breaking 
projects.apache.org category listing
 Key: KAFKA-9111
 URL: https://issues.apache.org/jira/browse/KAFKA-9111
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Nick Burch


The Kafka DOAP file in git has the project category entered incorrectly. This 
means that the projects.apache.org "by Category" listing doesn't show Kafka in 
the right place

I would expect to see Kakfa under "Big Data" at 
[https://projects.apache.org/projects.html?category#big-data] , but it's 
actually under a broken/nested-looking entry at 
[https://projects.apache.org/projects.html?category#https://projects.apache.org/projects.html?category#big-data]

As per [https://projects.apache.org/guidelines.html] the category at 
[https://github.com/apache/kafka/blob/trunk/doap_Kafka.rdf#L36] should be of 
the form , and so your big data 
category resource URI should be [http://projects.apache.org/category/big-data]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9110) Improve efficiency of disk reads when TLS is enabled

2019-10-29 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-9110:
---
Fix Version/s: 2.5.0

> Improve efficiency of disk reads when TLS is enabled
> 
>
> Key: KAFKA-9110
> URL: https://issues.apache.org/jira/browse/KAFKA-9110
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.5.0
>
>
> We currently do 8k reads and do unnecessary copies and allocations in every 
> read. Increasing the read size is particularly helpful for magnetic disks and 
> avoiding the copies and allocations improves CPU efficiency.
> See the pull request for more details:
> [https://github.com/apache/kafka/pull/7604]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9110) Improve efficiency of disk reads when TLS is enabled

2019-10-29 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-9110:
---
Description: 
We currently do 8k reads and do unnecessary copies and allocations in every 
read. Increasing the read size is particularly helpful for magnetic disks and 
avoiding the copies and allocations improves CPU efficiency.

See the pull request for more details:

[https://github.com/apache/kafka/pull/7604]

  was:
We currently do 8k reads and do unnecessary copies and allocations in every 
read. Increasing the read size is particularly helpful for magnetic disks and 
avoiding the copies and allocations improves CPU efficiency.

See the pull request for more details.


> Improve efficiency of disk reads when TLS is enabled
> 
>
> Key: KAFKA-9110
> URL: https://issues.apache.org/jira/browse/KAFKA-9110
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>
> We currently do 8k reads and do unnecessary copies and allocations in every 
> read. Increasing the read size is particularly helpful for magnetic disks and 
> avoiding the copies and allocations improves CPU efficiency.
> See the pull request for more details:
> [https://github.com/apache/kafka/pull/7604]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9110) Improve efficiency of disk reads when TLS is enabled

2019-10-29 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-9110:
--

 Summary: Improve efficiency of disk reads when TLS is enabled
 Key: KAFKA-9110
 URL: https://issues.apache.org/jira/browse/KAFKA-9110
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma


We currently do 8k reads and do unnecessary copies and allocations in every 
read. Increasing the read size is particularly helpful for magnetic disks and 
avoiding the copies and allocations improves CPU efficiency.

See the pull request for more details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6689) Kafka not release .deleted file.

2019-10-29 Thread Krithika Balu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16961959#comment-16961959
 ] 

Krithika Balu commented on KAFKA-6689:
--

I'll look into the issue and try to debug.

> Kafka not release .deleted file.
> 
>
> Key: KAFKA-6689
> URL: https://issues.apache.org/jira/browse/KAFKA-6689
> Project: Kafka
>  Issue Type: Bug
>  Components: config, controller, log
>Affects Versions: 0.10.1.1
> Environment: 3 Kafka Broker running on CentOS6.9(rungi3 VMs) 
>Reporter: A
>Priority: Critical
>  Labels: newbie
> Fix For: 0.10.1.1
>
>
>          After Kafka cleaned log  .timeindex / .index files based on topic 
> retention. I can
> still lsof a lot of .index.deleted and .timeindex.deleted files. 
>        We have 3 Brokers on 3 VMs ,  It's happen only 2 brokers.  
> [brokeer-01 ~]$ lsof -p 24324 | grep deleted | wc -l
> 28
> [broker-02 ~]$ lsof -p 12131 | grep deleted | wc -l
> 14599
> [broker-03 ~]$ lsof -p 3349 | grep deleted | wc -l
> 14523
>  
> Configuration on 3 broker is same.  (Rolllog every hour, Retention time 11 
> Hours)
>  * INFO KafkaConfig values:
>  advertised.host.name = null
>  advertised.listeners = PLAINTEXT://Broker-02:9092
>  advertised.port = null
>  authorizer.class.name =
>  auto.create.topics.enable = true
>  auto.leader.rebalance.enable = true
>  background.threads = 10
>  broker.id = 2
>  broker.id.generation.enable = true
>  broker.rack = null
>  compression.type = producer
>  connections.max.idle.ms = 60
>  controlled.shutdown.enable = true
>  controlled.shutdown.max.retries = 3
>  controlled.shutdown.retry.backoff.ms = 5000
>  controller.socket.timeout.ms = 3
>  default.replication.factor = 3
>  delete.topic.enable = true
>  fetch.purgatory.purge.interval.requests = 1000
>  group.max.session.timeout.ms = 30
>  group.min.session.timeout.ms = 6000
>  host.name =
>  inter.broker.protocol.version = 0.10.1-IV2
>  leader.imbalance.check.interval.seconds = 300
>  leader.imbalance.per.broker.percentage = 10
>  listeners = null
>  log.cleaner.backoff.ms = 15000
>  log.cleaner.dedupe.buffer.size = 134217728
>  log.cleaner.delete.retention.ms = 8640
>  log.cleaner.enable = true
>  log.cleaner.io.buffer.load.factor = 0.9
>  log.cleaner.io.buffer.size = 524288
>  log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
>  log.cleaner.min.cleanable.ratio = 0.5
>  log.cleaner.min.compaction.lag.ms = 0
>  log.cleaner.threads = 1
>  log.cleanup.policy = [delete]
>  log.dir = /tmp/kafka-logs
>  log.dirs = /data/appdata/kafka/data
>  log.flush.interval.messages = 9223372036854775807
>  log.flush.interval.ms = null
>  log.flush.offset.checkpoint.interval.ms = 6
>  log.flush.scheduler.interval.ms = 9223372036854775807
>  log.index.interval.bytes = 4096
>  log.index.size.max.bytes = 10485760
>  log.message.format.version = 0.10.1-IV2
>  log.message.timestamp.difference.max.ms = 9223372036854775807
>  log.message.timestamp.type = CreateTime
>  log.preallocate = false
>  log.retention.bytes = -1
>  log.retention.check.interval.ms = 30
>  log.retention.hours = 11
>  log.retention.minutes = 660
>  log.retention.ms = 3960
>  log.roll.hours = 1
>  log.roll.jitter.hours = 0
>  log.roll.jitter.ms = null
>  log.roll.ms = null
>  log.segment.bytes = 1073741824
>  log.segment.delete.delay.ms = 6
>  max.connections.per.ip = 2147483647
>  max.connections.per.ip.overrides =
>  message.max.bytes = 112
>  metric.reporters = []
>  metrics.num.samples = 2
>  metrics.sample.window.ms = 3
>  min.insync.replicas = 2
>  num.io.threads = 16
>  num.network.threads = 16
>  num.partitions = 10
>  num.recovery.threads.per.data.dir = 3
>  num.replica.fetchers = 1
>  offset.metadata.max.bytes = 4096
>  offsets.commit.required.acks = -1
>  offsets.commit.timeout.ms = 5000
>  offsets.load.buffer.size = 5242880
>  offsets.retention.check.interval.ms = 60
>  offsets.retention.minutes = 1440
>  offsets.topic.compression.codec = 0
>  offsets.topic.num.partitions = 50
>  offsets.topic.replication.factor = 3
>  offsets.topic.segment.bytes = 104857600
>  port = 9092
>  principal.builder.class = class 
> org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
>  producer.purgatory.purge.interval.requests = 1000
>  queued.max.requests = 3
>  quota.consumer.default = 9223372036854775807
>  quota.producer.default = 9223372036854775807
>  quota.window.num = 11
>  quota.window.size.seconds = 1
>  replica.fetch.backoff.ms = 1000
>  replica.fetch.max.bytes = 1048576
>  replica.fetch.min.bytes = 1
>  replica.fetch.response.max.bytes = 10485760
>  replica.fetch.wait.max.ms = 500
>  replica.high.watermark.checkpoint.interval.ms = 5000
>  replica.lag.time.max.ms = 1
>  replica.socket.receive.buffer.bytes = 65536
>  replica.socket.timeout.ms 

[jira] [Commented] (KAFKA-8010) kafka-configs.sh does not allow setting config with an equal in the value

2019-10-29 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16961918#comment-16961918
 ] 

Stanislav Kozlovski commented on KAFKA-8010:


I also hit this issue and verified using square brackets fixes it. Perhaps we 
should document this better in the kafka-configs help string?

> kafka-configs.sh does not allow setting config with an equal in the value
> -
>
> Key: KAFKA-8010
> URL: https://issues.apache.org/jira/browse/KAFKA-8010
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Mickael Maison
>Priority: Major
> Attachments: image-2019-03-05-19-45-47-168.png
>
>
> The sasl.jaas.config typically includes equals in its value. Unfortunately 
> the kafka-configs tool does not parse such values correctly and hits an error:
> ./kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers 
> --entity-name 59 --alter --add-config "sasl.jaas.config=KafkaServer \{\n  
> org.apache.kafka.common.security.plain.PlainLoginModule required\n  
> username=\"myuser\"\n  password=\"mypassword\";\n};\nClient \{\n  
> org.apache.zookeeper.server.auth.DigestLoginModule required\n  
> username=\"myuser2\"\n  password=\"mypassword2\;\n};"
> requirement failed: Invalid entity config: all configs to be added must be in 
> the format "key=val"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8353) org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms after enabling SASL PLAINTEXT authentication

2019-10-29 Thread Fangwei Duan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16961881#comment-16961881
 ] 

Fangwei Duan commented on KAFKA-8353:
-

I had this problem too. and finally it turn out the reason is the client server 
time diff with kdc server time more than 5mins. after the  sync the time, we  
had solved this problem.

> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
> after 6 ms after enabling SASL PLAINTEXT authentication
> --
>
> Key: KAFKA-8353
> URL: https://issues.apache.org/jira/browse/KAFKA-8353
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, security
>Affects Versions: 0.10.2.1
>Reporter: goutham
>Priority: Critical
>  Labels: security
>
> I'm running into time out exception when i try to run producer and consumer 
> through java or console.
>  
> *kafka server.properties*
> [advertised.host.name|http://advertised.host.name/]=127.0.0.1
>  
> listeners=SASL_PLAINTEXT://[127.0.0.1:9090|http://127.0.0.1:9090/]
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
> advertised.listeners=SASL_PLAINTEXT://[127.0.0.1:9090|http://127.0.0.1:9090/]
>  
> *kafka server jass conf*
>  
> KafkaServer {  
> org.apache.kafka.common.security.plain.PlainLoginModule required
>    username="admin"
>    password="admin"
>    user_admin="admin"
>    user_test="test";
>  
> };
>  
>  
> *client producer/consumer properties* 
>  
>  
> String jaasTemplate = 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"%s\" password=\"%s\";";
> String jaasCfg = String.format(jaasTemplate, "test", "test");
> brokers.delete(brokers.length() - 1, brokers.length());
> properties.put("bootstrap.servers", brokers.toString());
> properties.put("[retry.backoff.ms|http://retry.backoff.ms/];, "1000");
> properties.put("[reconnect.backoff.ms|http://reconnect.backoff.ms/];, "1000");
> properties.put("max.request.size", "5242880");
> properties.put("key.serializer", 
> "org.apache.kafka.common.serialization.ByteArraySerializer");
> properties.put("value.serializer", 
> "org.apache.kafka.common.serialization.ByteArraySerializer");
> // properties.put("[metadata.max.age.ms|http://metadata.max.age.ms/];, 
> 15000); //Refresh topic partition leadership every 15 seconds
> properties.put("sasl.jaas.config", jaasCfg);
> properties.put("security.protocol", "SASL_PLAINTEXT");
> properties.put("sasl.mechanism", "PLAIN");
> properties.put("ssl.client.auth", "none"); Also added env variable for 
> KAKFA_OPTS with jass config location so console consumer can use that login 
> module.i am running single node kafka (0.10.2) with zookeeper (3.4.9). with 
> these setting both broker and zookeeper comes up.But clients with valid 
> credential not able to write/read from the broker.pretty much used steps in 
> documentation from apache Kafka.Pls advice?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9077) System Test Failure: StreamsSimpleBenchmarkTest

2019-10-29 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-9077:


Assignee: Bruno Cadonna

> System Test Failure: StreamsSimpleBenchmarkTest
> ---
>
> Key: KAFKA-9077
> URL: https://issues.apache.org/jira/browse/KAFKA-9077
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Affects Versions: 2.4.0
>Reporter: Manikumar
>Assignee: Bruno Cadonna
>Priority: Minor
> Fix For: 2.5.0
>
>
> StreamsSimpleBenchmarkTest tests are failing on 2.4 and trunk.
> http://confluent-kafka-2-4-system-test-results.s3-us-west-2.amazonaws.com/2019-10-21--001.1571716233--confluentinc--2.4--cb4944f/report.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8977) Remove MockStreamsMetrics Since it is not a Mock

2019-10-29 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16961829#comment-16961829
 ] 

Bruno Cadonna commented on KAFKA-8977:
--

I see your point. Isn't it possible to come up with a common setup for a 
`StreamsMetricsImpl` mock and externalise it, so that this mock can be reused 
in multiple tests? 

> Remove MockStreamsMetrics Since it is not a Mock
> 
>
> Key: KAFKA-8977
> URL: https://issues.apache.org/jira/browse/KAFKA-8977
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: bibin sebastian
>Priority: Minor
>  Labels: newbie
>
> The class {{MockStreamsMetrics}} is used throughout unit tests as a mock but 
> it is not really a mock since it only hides two parameters of the 
> {{StreamsMetricsImpl}} constructor. Either a real mock or the real 
> {{StreamsMetricsImpl}} should be used in the tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8630) Unit testing a streams processor with a WindowStore throws a ClassCastException

2019-10-29 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16961820#comment-16961820
 ] 

Bruno Cadonna commented on KAFKA-8630:
--

I also run into the {{ClassCastException}} during development. I had to work 
around it. While I agree that the fastest way to solve the specific issue of 
this ticket is on the test side, I also feel that it is somehow awkward to get 
a {{ClassCastException}} when your code conforms to the specified APIs. The 
cast to {{InternalProcessorContext}} is problematic and we should get rid of 
it. Thus, I filed KAFKA-9109.

> Unit testing a streams processor with a WindowStore throws a 
> ClassCastException
> ---
>
> Key: KAFKA-8630
> URL: https://issues.apache.org/jira/browse/KAFKA-8630
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 2.3.0
>Reporter: Justin Fetherolf
>Priority: Major
>
> I was attempting to write a unit test for a class implementing the 
> {{Processor}} interface that contained a {{WindowStore}}, but running the 
> test fails with a {{ClassCastException}} coming out of 
> {{InMemoryWindowStore.init}} attempting to cast {{MockProcessorContext}} to 
> {{InternalProcessorContext}}.
> Minimal code to reproduce:
> {code:java}
> package com.cantgetthistowork;
> import org.apache.kafka.streams.processor.Processor;
> import org.apache.kafka.streams.processor.ProcessorContext;
> import org.apache.kafka.streams.state.WindowStore;
> public class InMemWindowProcessor implements Processor {
>   private ProcessorContext context;
>   private WindowStore windowStore;
>   @Override
>   public void init(ProcessorContext context) {
> this.context = context;
> windowStore = (WindowStore) 
> context.getStateStore("my-win-store");
>   }
>   @Override
>   public void process(String key, String value) {
>   }
>   @Override
>   public void close() {
>   }
> }
> {code}
> {code:java}
> package com.cantgetthistowork;
> import java.time.Duration;
> import java.time.Instant;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.processor.MockProcessorContext;
> import org.apache.kafka.streams.state.Stores;
> import org.apache.kafka.streams.state.WindowStore;
> import org.junit.Before;
> import org.junit.Test;
> public class InMemWindowProcessorTest {
>   InMemWindowProcessor processor = null;
>   MockProcessorContext context = null;
>   @Before
>   public void setup() {
> processor = new InMemWindowProcessor();
> context = new MockProcessorContext();
> WindowStore store =
>   Stores.windowStoreBuilder(
> Stores.inMemoryWindowStore(
>   "my-win-store",
>   Duration.ofMinutes(10),
>   Duration.ofSeconds(10),
>   false
> ),
> Serdes.String(),
> Serdes.String()
>   )
>   .withLoggingDisabled()
>   .build();
> store.init(context, store);
> context.register(store, null);
> processor.init(context);
>   }
>   @Test
>   public void testThings() {
> Instant baseTime = Instant.now();
> context.setTimestamp(baseTime.toEpochMilli());
> context.setTopic("topic-name");
> processor.process("key1", "value1");
>   }
> }
> {code}
>  
> I was trying this with maven, with mvn --version outputting:
> {noformat}
> Apache Maven 3.5.0 (ff8f5e7444045639af65f6095c62210b5713f426; 
> 2017-04-03T13:39:06-06:00)
> Maven home: ~/opt/apache-maven-3.5.0
> Java version: 1.8.0_212, vendor: Oracle Corporation
> Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
> Default locale: en_US, platform encoding: UTF-8
> OS name: "linux", version: "4.15.0-52-generic", arch: "amd64", family: 
> "unix"{noformat}
> And finally the stack trace:
> {noformat}
> ---
>  T E S T S
> ---
> Running com.cantgetthistowork.InMemWindowProcessorTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
> details.
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.076 sec <<< 
> FAILURE!
> testThings(com.cantgetthistowork.InMemWindowProcessorTest)  Time elapsed: 
> 0.05 sec  <<< ERROR!
> java.lang.ClassCastException: 
> org.apache.kafka.streams.processor.MockProcessorContext cannot be cast to 
> org.apache.kafka.streams.processor.internals.InternalProcessorContext
> at 
> org.apache.kafka.streams.state.internals.InMemoryWindowStore.init(InMemoryWindowStore.java:91)
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
> at 
> 

[jira] [Created] (KAFKA-9109) Get Rid of Cast from ProcessorContext to InternalProcessorContext

2019-10-29 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-9109:


 Summary: Get Rid of Cast from ProcessorContext to 
InternalProcessorContext
 Key: KAFKA-9109
 URL: https://issues.apache.org/jira/browse/KAFKA-9109
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna


The following cast is often used in Kafka Streams code.

{code:java}
public void init(final ProcessorContext context) {
internalProcessorContext = (InternalProcessorContext) context;
...
}
{code}

This code leads to a {{ClassCastException}} if the implementation of the 
{{ProcessorContext}} is not an {{InternalProcessorContext}}, which defeats the 
purpose of using interface {{ProcessorContext}} in the API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2019-10-29 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-8733:
--
Description: 
We found offline partitions issue multiple times on some of the hosts in our 
clusters. After going through the broker logs and hosts’s disk stats, it looks 
like this issue occurs whenever the read/write operations take more time on 
that disk. In a particular case where read time is more than the 
replica.lag.time.max.ms, follower replicas will be out of sync as their earlier 
fetch requests are stuck while reading the local log and their fetch status is 
not yet updated as mentioned in the below code of `ReplicaManager`. If there is 
an issue in reading the data from the log for a duration more than 
replica.lag.time.max.ms then all the replicas will be out of sync and partition 
becomes offline if min.isr.replicas > 1 and unclean.leader.election is false.

 
{code:java}
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
  val result = readFromLocalLog( // this call took more than 
`replica.lag.time.max.ms`
  replicaId = replicaId,
  fetchOnlyFromLeader = fetchOnlyFromLeader,
  readOnlyCommitted = fetchOnlyCommitted,
  fetchMaxBytes = fetchMaxBytes,
  hardMaxBytesLimit = hardMaxBytesLimit,
  readPartitionInfo = fetchInfos,
  quota = quota,
  isolationLevel = isolationLevel)
  if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch 
time gets updated here, but mayBeShrinkIsr should have been already called and 
the replica is removed from isr
 else result
 }

val logReadResults = readFromLog()
{code}
Attached the graphs of disk weighted io time stats when this issue occurred.

I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how to 
handle this scenario.

 

  was:
We found offline partitions issue multiple times on some of the hosts in our 
clusters. After going through the broker logs and hosts’s disk stats, it looks 
like this issue occurs whenever the read/write operations take more time on 
that disk. In a particular case where read time is more than the 
replica.lag.time.max.ms, follower replicas will be out of sync as their earlier 
fetch requests are stuck while reading the local log and their fetch status is 
not yet updated as mentioned in the below code of `ReplicaManager`. If there is 
an issue in reading the data from the log for a duration more than 
replica.lag.time.max.ms then all the replicas will be out of sync and partition 
becomes offline if min.isr.replicas > 1 and unclean.leader.election is false.

 
{code:java}
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
  val result = readFromLocalLog( // this call took more than 
`replica.lag.time.max.ms`
  replicaId = replicaId,
  fetchOnlyFromLeader = fetchOnlyFromLeader,
  readOnlyCommitted = fetchOnlyCommitted,
  fetchMaxBytes = fetchMaxBytes,
  hardMaxBytesLimit = hardMaxBytesLimit,
  readPartitionInfo = fetchInfos,
  quota = quota,
  isolationLevel = isolationLevel)
  if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch 
time gets updated here, but mayBeShrinkIsr should have been already called and 
the replica is removed from isr
 else result
 }

val logReadResults = readFromLog()
{code}
Attached the graphs of disk weighted io time stats when this issue occurred.

I will raise a 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time]
 describing options on how to handle this scenario.

 


> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if 

[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext

2019-10-29 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16961786#comment-16961786
 ] 

Bruno Cadonna commented on KAFKA-9088:
--

[~ableegoldman] Ah OK, I now saw the related issue under "Issue Links".

> Consolidate InternalMockProcessorContext and MockInternalProcessorContext
> -
>
> Key: KAFKA-9088
> URL: https://issues.apache.org/jira/browse/KAFKA-9088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Bruno Cadonna
>Priority: Minor
>  Labels: newbie
>
> Currently, we have two mocks for the {{InternalProcessorContext}}. The goal 
> of this ticket is to merge both into one mock or replace it with an 
> {{EasyMock}} mock. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext

2019-10-29 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16961780#comment-16961780
 ] 

Bruno Cadonna commented on KAFKA-9088:
--

[~ableegoldman] I do not completely follow here, also because the Jira issue 
linked in your comment links to this issue. What do you mean with "new 
MockInternalProcessorContext"? The one that is implemented as result of this 
ticket?  

> Consolidate InternalMockProcessorContext and MockInternalProcessorContext
> -
>
> Key: KAFKA-9088
> URL: https://issues.apache.org/jira/browse/KAFKA-9088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Bruno Cadonna
>Priority: Minor
>  Labels: newbie
>
> Currently, we have two mocks for the {{InternalProcessorContext}}. The goal 
> of this ticket is to merge both into one mock or replace it with an 
> {{EasyMock}} mock. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)