[jira] [Commented] (KAFKA-6666) OffsetOutOfRangeException: Replica Thread Stopped Resulting in Underreplicated Partitions

2018-03-15 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401411#comment-16401411
 ] 

huxihx commented on KAFKA-:
---

The exception is caused by the fact that high watermark was set to -1 which 
should not be allowed to happen. 
[KAFKA-3978|https://issues.apache.org/jira/browse/KAFKA-3978] claimed to 
resolve the HW issue. However, no final conclusion has yet been reached on this 
issue.

> OffsetOutOfRangeException: Replica Thread Stopped Resulting in 
> Underreplicated Partitions
> -
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Srinivas Dhruvakumar
>Priority: Critical
> Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png
>
>
> Hello All, 
> Currently we were seeing a few underreplicated partitions on our test cluster 
> which is used for Intergation testing. On debugging more we found the replica 
> thread was stopped due to an error 
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 50 of partition  since it is larger 
> than the high watermark -1
> Kindly find the attached screenshot. 
> !Screen Shot 2018-03-15 at 3.52.13 PM.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6663) Expression for GlobalKTable is not correct

2018-03-15 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-6663:
-

Assignee: huxihx

> Expression for GlobalKTable is not correct
> --
>
> Key: KAFKA-6663
> URL: https://issues.apache.org/jira/browse/KAFKA-6663
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>  Labels: documentation
>
> In [this stream doc 
> section|https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api#creating-source-streams-from-kafka],
>   when reading records from Kafka to a global KTable, the doc says:
> `In the case of a GlobalKTable, the local GlobalKTable instance of every 
> application instance will be populated with data from only a *subset* of the 
> partitions of the input topic. Collectively, across all application 
> instances, all input topic partitions are read and processed.`
> Is it correct? Each GlobalKTable instance only get assigned with a subset of 
> the partitions of the input topic? I remember it should be able to consume 
> all the partitions of the input topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6663) Expression for GlobalKTable is not correct

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401407#comment-16401407
 ] 

ASF GitHub Bot commented on KAFKA-6663:
---

huxihx opened a new pull request #4723: KAFKA-6663: Doc for `GlobalKTable` 
should be corrected.
URL: https://github.com/apache/kafka/pull/4723
 
 
   https://issues.apache.org/jira/browse/KAFKA-6663
   
   Doc should be refined to express the fact that GlobalKTable should be able 
to consume all the partitions of the input topic.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Expression for GlobalKTable is not correct
> --
>
> Key: KAFKA-6663
> URL: https://issues.apache.org/jira/browse/KAFKA-6663
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: huxihx
>Priority: Minor
>  Labels: documentation
>
> In [this stream doc 
> section|https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api#creating-source-streams-from-kafka],
>   when reading records from Kafka to a global KTable, the doc says:
> `In the case of a GlobalKTable, the local GlobalKTable instance of every 
> application instance will be populated with data from only a *subset* of the 
> partitions of the input topic. Collectively, across all application 
> instances, all input topic partitions are read and processed.`
> Is it correct? Each GlobalKTable instance only get assigned with a subset of 
> the partitions of the input topic? I remember it should be able to consume 
> all the partitions of the input topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6666) OffsetOutOfRangeException: Replica Thread Stopped Resulting in Underreplicated Partitions

2018-03-15 Thread Srinivas Dhruvakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401387#comment-16401387
 ] 

Srinivas Dhruvakumar commented on KAFKA-:
-

Looking at the other issue . Is there any fix for it ? Or just catching the 
exception ?

> OffsetOutOfRangeException: Replica Thread Stopped Resulting in 
> Underreplicated Partitions
> -
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Srinivas Dhruvakumar
>Priority: Critical
> Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png
>
>
> Hello All, 
> Currently we were seeing a few underreplicated partitions on our test cluster 
> which is used for Intergation testing. On debugging more we found the replica 
> thread was stopped due to an error 
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 50 of partition  since it is larger 
> than the high watermark -1
> Kindly find the attached screenshot. 
> !Screen Shot 2018-03-15 at 3.52.13 PM.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6666) OffsetOutOfRangeException: Replica Thread Stopped Resulting in Underreplicated Partitions

2018-03-15 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401353#comment-16401353
 ] 

huxihx commented on KAFKA-:
---

Seems it's a duplicate of 
[KAFKA-6649|https://issues.apache.org/jira/browse/KAFKA-6649].

> OffsetOutOfRangeException: Replica Thread Stopped Resulting in 
> Underreplicated Partitions
> -
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Srinivas Dhruvakumar
>Priority: Critical
> Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png
>
>
> Hello All, 
> Currently we were seeing a few underreplicated partitions on our test cluster 
> which is used for Intergation testing. On debugging more we found the replica 
> thread was stopped due to an error 
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 50 of partition  since it is larger 
> than the high watermark -1
> Kindly find the attached screenshot. 
> !Screen Shot 2018-03-15 at 3.52.13 PM.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401313#comment-16401313
 ] 

ASF GitHub Bot commented on KAFKA-3978:
---

lindong28 opened a new pull request #4722: MINOR KAFKA-3978 followup
URL: https://github.com/apache/kafka/pull/4722
 
 
   We should use logStartOffset as HW offset if the current HW offset is out of 
range.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cannot truncate to a negative offset (-1) exception at broker startup
> -
>
> Key: KAFKA-3978
> URL: https://issues.apache.org/jira/browse/KAFKA-3978
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
> Environment: 3.13.0-87-generic 
>Reporter: Juho Mäkinen
>Assignee: Dong Lin
>Priority: Critical
>  Labels: reliability, startup
> Fix For: 1.1.0
>
>
> During broker startup sequence the broker server.log has this exception. 
> Problem persists after multiple restarts and also on another broker in the 
> cluster.
> {code}
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser)
> INFO Kafka commitId : b8642491e78c5a13 
> (org.apache.kafka.common.utils.AppInfoParser)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> Error when handling request 
> 

[jira] [Commented] (KAFKA-6651) SchemaBuilder should not allow Arrays or Maps to be created by type()

2018-03-15 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401270#comment-16401270
 ] 

Ewen Cheslack-Postava commented on KAFKA-6651:
--

Could go either way. Based on the comment, I think I originally exposed that 
constructor to support use cases that might do something like maintain a 
Map to dynamically generate schemas. I don't know 
that any actually do that in practice (though since you encountered this 
[~jcustenborder] maybe you are using a pattern like that?). I think most that 
need something dynamic end up just using a big switch statement with cases for 
each type. If nobody uses it, the other option would be to deprecate and remove 
that constructor.

> SchemaBuilder should not allow Arrays or Maps to be created by type()
> -
>
> Key: KAFKA-6651
> URL: https://issues.apache.org/jira/browse/KAFKA-6651
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Priority: Minor
>
> The following code should throw an exception because we cannot set 
> valueSchema() or keySchema() once the builder is returned. 
> {code:java}
> SchemaBuilder.type(Schema.Type.ARRAY);
> SchemaBuilder.type(Schema.Type.MAP);{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-03-15 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401263#comment-16401263
 ] 

Jason Gustafson commented on KAFKA-6649:


[~julion] The ultimate failure sequence may be different, but I think the 
underlying cause is the fact that the high watermark could be incorrectly set 
to -1 in some scenarios. I suspect that the fix for that issue will fix this 
issue as well, but I'm not sure. That fix will get into 1.1, so if it is 
consistently reproducible, maybe you could test with the current RC? Here is a 
link to the artifacts: http://home.apache.org/~damianguy/kafka-1.1.0-rc3/.

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup

2018-03-15 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-3978:
---
Fix Version/s: (was: 1.2.0)
   1.1.0

> Cannot truncate to a negative offset (-1) exception at broker startup
> -
>
> Key: KAFKA-3978
> URL: https://issues.apache.org/jira/browse/KAFKA-3978
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
> Environment: 3.13.0-87-generic 
>Reporter: Juho Mäkinen
>Assignee: Dong Lin
>Priority: Critical
>  Labels: reliability, startup
> Fix For: 1.1.0
>
>
> During broker startup sequence the broker server.log has this exception. 
> Problem persists after multiple restarts and also on another broker in the 
> cluster.
> {code}
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser)
> INFO Kafka commitId : b8642491e78c5a13 
> (org.apache.kafka.common.utils.AppInfoParser)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> Error when handling request 
> {controller_id=1004,controller_epoch=1,partition_states=[..REALLY LONG OUTPUT 
> SNIPPED AWAY..], 
> live_leaders=[{id=1004,host=172.16.6.187,port=9092},{id=1003,host=172.16.2.21,port=9092}]}
>  (kafka.server.KafkaApis)
> ERROR java.lang.IllegalArgumentException: Cannot truncate to a negative 
> offset (-1).
> at kafka.log.Log.truncateTo(Log.scala:731)
> at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:288)
> at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:280)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> at 
> 

[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task

2018-03-15 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401255#comment-16401255
 ] 

Ewen Cheslack-Postava commented on KAFKA-6661:
--

Oh, also, I marked it as 1.1.1 release, but if we end up doing another RC we'll 
want to adjust this to 1.1.0.

> Sink connectors that explicitly 'resume' topic partitions can resume a paused 
> task
> --
>
> Key: KAFKA-6661
> URL: https://issues.apache.org/jira/browse/KAFKA-6661
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
> Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to 
> explicitly pause and resume topic partitions. This is useful when connectors 
> need additional time processing the records for specific topic partitions 
> (e.g., the external system has an outage).
> However, when the sink connector has been paused via the REST API, the worker 
> for the sink tasks pause the consumer. When the connector is polled, the poll 
> request might timeout and return no records. Connect then calls the task's 
> {{put(...)}} method (with no records), and this allows the task to optionally 
> call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls 
> resume, this will unexpectedly resume the paused consumer, causing the 
> consumer to return messages and the connector to process those messages --  
> despite the connector still being paused.
> This is reported against 1.0, but the affected code has not been changed 
> since at least 0.9.0.0.
> A workaround is to remove rather than pause a connector. It's inconvenient, 
> but it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task

2018-03-15 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401254#comment-16401254
 ] 

Ewen Cheslack-Postava commented on KAFKA-6661:
--

[~rhauch] Got this cherry-picked back to 0.10.0. If we want 0.9.0 as well, we 
probably need a separate PR since there was enough code movement to make it 
non-trivial.

On a related note, it wasn't too bad in this case, but for things we want to 
backport its better to separate the must-have stuff from the nice-to-have 
improvements. It's not so much the risk (in this case it was just a toString 
and some log statements), but that the larger the patch, the less likely we get 
a clean cherry-pick and that's a lot more time consuming in cases where we want 
to cherry-pick through a bunch of release branches. In this case the pain was 
really caused by a different commit that mostly made cosmetic improvements that 
make cherry-picking encounter conflicts, but something to keep in mind in the 
future.

> Sink connectors that explicitly 'resume' topic partitions can resume a paused 
> task
> --
>
> Key: KAFKA-6661
> URL: https://issues.apache.org/jira/browse/KAFKA-6661
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
> Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to 
> explicitly pause and resume topic partitions. This is useful when connectors 
> need additional time processing the records for specific topic partitions 
> (e.g., the external system has an outage).
> However, when the sink connector has been paused via the REST API, the worker 
> for the sink tasks pause the consumer. When the connector is polled, the poll 
> request might timeout and return no records. Connect then calls the task's 
> {{put(...)}} method (with no records), and this allows the task to optionally 
> call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls 
> resume, this will unexpectedly resume the paused consumer, causing the 
> consumer to return messages and the connector to process those messages --  
> despite the connector still being paused.
> This is reported against 1.0, but the affected code has not been changed 
> since at least 0.9.0.0.
> A workaround is to remove rather than pause a connector. It's inconvenient, 
> but it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task

2018-03-15 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6661:
-
Fix Version/s: 1.1.1
   1.0.2
   0.11.0.3
   0.10.2.2
   0.10.1.2
   0.10.0.2

> Sink connectors that explicitly 'resume' topic partitions can resume a paused 
> task
> --
>
> Key: KAFKA-6661
> URL: https://issues.apache.org/jira/browse/KAFKA-6661
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Critical
> Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to 
> explicitly pause and resume topic partitions. This is useful when connectors 
> need additional time processing the records for specific topic partitions 
> (e.g., the external system has an outage).
> However, when the sink connector has been paused via the REST API, the worker 
> for the sink tasks pause the consumer. When the connector is polled, the poll 
> request might timeout and return no records. Connect then calls the task's 
> {{put(...)}} method (with no records), and this allows the task to optionally 
> call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls 
> resume, this will unexpectedly resume the paused consumer, causing the 
> consumer to return messages and the connector to process those messages --  
> despite the connector still being paused.
> This is reported against 1.0, but the affected code has not been changed 
> since at least 0.9.0.0.
> A workaround is to remove rather than pause a connector. It's inconvenient, 
> but it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6666) OffsetOutOfRangeException: Replica Thread Stopped Resulting in Underreplicated Partitions

2018-03-15 Thread Srinivas Dhruvakumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Srinivas Dhruvakumar updated KAFKA-:

Priority: Critical  (was: Major)

> OffsetOutOfRangeException: Replica Thread Stopped Resulting in 
> Underreplicated Partitions
> -
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Srinivas Dhruvakumar
>Priority: Critical
> Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png
>
>
> Hello All, 
> Currently we were seeing a few underreplicated partitions on our test cluster 
> which is used for Intergation testing. On debugging more we found the replica 
> thread was stopped due to an error 
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 50 of partition  since it is larger 
> than the high watermark -1
> Kindly find the attached screenshot. 
> !Screen Shot 2018-03-15 at 3.52.13 PM.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6666) OffsetOutOfRangeException: Replica Thread Stopped Resulting in Underreplicated Partitions

2018-03-15 Thread Srinivas Dhruvakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401235#comment-16401235
 ] 

Srinivas Dhruvakumar commented on KAFKA-:
-

This is consistently happening in our test cluster environment. 

> OffsetOutOfRangeException: Replica Thread Stopped Resulting in 
> Underreplicated Partitions
> -
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Srinivas Dhruvakumar
>Priority: Major
> Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png
>
>
> Hello All, 
> Currently we were seeing a few underreplicated partitions on our test cluster 
> which is used for Intergation testing. On debugging more we found the replica 
> thread was stopped due to an error 
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 50 of partition  since it is larger 
> than the high watermark -1
> Kindly find the attached screenshot. 
> !Screen Shot 2018-03-15 at 3.52.13 PM.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6666) OffsetOutOfRangeException: Replica Thread Stopped Resulting in Underreplicated Partitions

2018-03-15 Thread Srinivas Dhruvakumar (JIRA)
Srinivas Dhruvakumar created KAFKA-:
---

 Summary: OffsetOutOfRangeException: Replica Thread Stopped 
Resulting in Underreplicated Partitions
 Key: KAFKA-
 URL: https://issues.apache.org/jira/browse/KAFKA-
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.11.0.1
Reporter: Srinivas Dhruvakumar
 Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png

Hello All, 

Currently we were seeing a few underreplicated partitions on our test cluster 
which is used for Intergation testing. On debugging more we found the replica 
thread was stopped due to an error 

Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
increment the log start offset to 50 of partition  since it is larger than 
the high watermark -1

Kindly find the attached screenshot. 

!Screen Shot 2018-03-15 at 3.52.13 PM.png!

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently

2018-03-15 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16392410#comment-16392410
 ] 

Ted Yu edited comment on KAFKA-6335 at 3/15/18 11:01 PM:
-

Happened again:

https://builds.apache.org/job/kafka-trunk-jdk9/480/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/


was (Author: yuzhih...@gmail.com):
Sigh - the above link is no longer accessible.

> SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails 
> intermittently
> --
>
> Key: KAFKA-6335
> URL: https://issues.apache.org/jira/browse/KAFKA-6335
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.2.0
>
>
> From 
> https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/
>  :
> {code}
> java.lang.AssertionError: expected acls Set(User:36 has Allow permission for 
> operations: Read from hosts: *, User:7 has Allow permission for operations: 
> Read from hosts: *, User:21 has Allow permission for operations: Read from 
> hosts: *, User:39 has Allow permission for operations: Read from hosts: *, 
> User:43 has Allow permission for operations: Read from hosts: *, User:3 has 
> Allow permission for operations: Read from hosts: *, User:35 has Allow 
> permission for operations: Read from hosts: *, User:15 has Allow permission 
> for operations: Read from hosts: *, User:16 has Allow permission for 
> operations: Read from hosts: *, User:22 has Allow permission for operations: 
> Read from hosts: *, User:26 has Allow permission for operations: Read from 
> hosts: *, User:11 has Allow permission for operations: Read from hosts: *, 
> User:38 has Allow permission for operations: Read from hosts: *, User:8 has 
> Allow permission for operations: Read from hosts: *, User:28 has Allow 
> permission for operations: Read from hosts: *, User:32 has Allow permission 
> for operations: Read from hosts: *, User:25 has Allow permission for 
> operations: Read from hosts: *, User:41 has Allow permission for operations: 
> Read from hosts: *, User:44 has Allow permission for operations: Read from 
> hosts: *, User:48 has Allow permission for operations: Read from hosts: *, 
> User:2 has Allow permission for operations: Read from hosts: *, User:9 has 
> Allow permission for operations: Read from hosts: *, User:14 has Allow 
> permission for operations: Read from hosts: *, User:46 has Allow permission 
> for operations: Read from hosts: *, User:13 has Allow permission for 
> operations: Read from hosts: *, User:5 has Allow permission for operations: 
> Read from hosts: *, User:29 has Allow permission for operations: Read from 
> hosts: *, User:45 has Allow permission for operations: Read from hosts: *, 
> User:6 has Allow permission for operations: Read from hosts: *, User:37 has 
> Allow permission for operations: Read from hosts: *, User:23 has Allow 
> permission for operations: Read from hosts: *, User:19 has Allow permission 
> for operations: Read from hosts: *, User:24 has Allow permission for 
> operations: Read from hosts: *, User:17 has Allow permission for operations: 
> Read from hosts: *, User:34 has Allow permission for operations: Read from 
> hosts: *, User:12 has Allow permission for operations: Read from hosts: *, 
> User:42 has Allow permission for operations: Read from hosts: *, User:4 has 
> Allow permission for operations: Read from hosts: *, User:47 has Allow 
> permission for operations: Read from hosts: *, User:18 has Allow permission 
> for operations: Read from hosts: *, User:31 has Allow permission for 
> operations: Read from hosts: *, User:49 has Allow permission for operations: 
> Read from hosts: *, User:33 has Allow permission for operations: Read from 
> hosts: *, User:1 has Allow permission for operations: Read from hosts: *, 
> User:27 has Allow permission for operations: Read from hosts: *) but got 
> Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 
> has Allow permission for operations: Read from hosts: *, User:21 has Allow 
> permission for operations: Read from hosts: *, User:39 has Allow permission 
> for operations: Read from hosts: *, User:43 has Allow permission for 
> operations: Read from hosts: *, User:3 has Allow permission for operations: 
> Read from hosts: *, User:35 has Allow permission for operations: Read from 
> hosts: *, User:15 has Allow permission for operations: Read from hosts: *, 
> User:16 has Allow permission for operations: Read from hosts: *, User:22 has 
> Allow permission for operations: Read from hosts: *, User:26 has Allow 
> 

[jira] [Commented] (KAFKA-6661) Sink connectors that explicitly 'resume' topic partitions can resume a paused task

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401219#comment-16401219
 ] 

ASF GitHub Bot commented on KAFKA-6661:
---

ewencp closed pull request #4716: KAFKA-6661: Ensure sink connectors don’t 
resume consumer when task is paused
URL: https://github.com/apache/kafka/pull/4716
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 2995a4e813a..2ba785c4668 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -130,7 +130,7 @@ public void initialize(TaskConfig taskConfig) {
 try {
 this.taskConfig = taskConfig.originalsStrings();
 this.consumer = createConsumer();
-this.context = new WorkerSinkTaskContext(consumer);
+this.context = new WorkerSinkTaskContext(consumer, this);
 } catch (Throwable t) {
 log.error("{} Task failed initialization and will not be 
started.", this, t);
 onFailure(t);
@@ -601,7 +601,7 @@ SinkTaskMetricsGroup sinkTaskMetricsGroup() {
 private class HandleRebalance implements ConsumerRebalanceListener {
 @Override
 public void onPartitionsAssigned(Collection 
partitions) {
-log.debug("{} Partitions assigned", WorkerSinkTask.this);
+log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
partitions);
 lastCommittedOffsets = new HashMap<>();
 currentOffsets = new HashMap<>();
 for (TopicPartition tp : partitions) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 08789497645..386f992e82a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -20,6 +20,8 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.errors.IllegalWorkerStateException;
 import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,26 +31,32 @@
 import java.util.Set;
 
 public class WorkerSinkTaskContext implements SinkTaskContext {
+
+private final Logger log = LoggerFactory.getLogger(getClass());
 private Map offsets;
 private long timeoutMs;
 private KafkaConsumer consumer;
+private final WorkerSinkTask sinkTask;
 private final Set pausedPartitions;
 private boolean commitRequested;
 
-public WorkerSinkTaskContext(KafkaConsumer consumer) {
+public WorkerSinkTaskContext(KafkaConsumer consumer, 
WorkerSinkTask sinkTask) {
 this.offsets = new HashMap<>();
 this.timeoutMs = -1L;
 this.consumer = consumer;
+this.sinkTask = sinkTask;
 this.pausedPartitions = new HashSet<>();
 }
 
 @Override
 public void offset(Map offsets) {
+log.debug("{} Setting offsets for topic partitions {}", this, offsets);
 this.offsets.putAll(offsets);
 }
 
 @Override
 public void offset(TopicPartition tp, long offset) {
+log.debug("{} Setting offset for topic partition {} to {}", this, tp, 
offset);
 offsets.put(tp, offset);
 }
 
@@ -66,6 +74,7 @@ public void clearOffsets() {
 
 @Override
 public void timeout(long timeoutMs) {
+log.debug("{} Setting timeout to {} ms", this, timeoutMs);
 this.timeoutMs = timeoutMs;
 }
 
@@ -92,7 +101,12 @@ public void pause(TopicPartition... partitions) {
 }
 try {
 Collections.addAll(pausedPartitions, partitions);
-consumer.pause(Arrays.asList(partitions));
+if (sinkTask.shouldPause()) {
+log.debug("{} Connector is paused, so not pausing consumer's 
partitions {}", this, partitions);
+} else {
+consumer.pause(Arrays.asList(partitions));
+log.debug("{} Pausing partitions {}. Connector is not 
paused.", this, partitions);
+}
 } catch (IllegalStateException e) {
 throw new IllegalWorkerStateException("SinkTasks may not pause 
partitions that are not 

[jira] [Updated] (KAFKA-6608) Add TimeoutException to KafkaConsumer#position()

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6608:
---
Description: 
In KAFKA-4879, Kafka Consumer hangs indefinitely due to Fetcher's {{timeout}} 
being set to {{Long.MAX_VALUE}}. While fixing this issue, it was pointed out 
that if a timeout was added to methods which commits offsets synchronously, a 
stricter control on time could be achieved.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886

  was:In KAFKA-4879, Kafka Consumer hangs indefinitely due to Fetcher's 
{{timeout}} being set to {{Long.MAX_VALUE}}. While fixing this issue, it was 
pointed out that if a timeout was added to methods which commits offsets 
synchronously, a stricter control on time could be achieved.


> Add TimeoutException to KafkaConsumer#position()
> 
>
> Key: KAFKA-6608
> URL: https://issues.apache.org/jira/browse/KAFKA-6608
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Blocker
>  Labels: kip
>
> In KAFKA-4879, Kafka Consumer hangs indefinitely due to Fetcher's {{timeout}} 
> being set to {{Long.MAX_VALUE}}. While fixing this issue, it was pointed out 
> that if a timeout was added to methods which commits offsets synchronously, a 
> stricter control on time could be achieved.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6608) Add TimeoutException to KafkaConsumer#position()

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6608:
---
Labels: kip  (was: needs-discussion needs-kip)

> Add TimeoutException to KafkaConsumer#position()
> 
>
> Key: KAFKA-6608
> URL: https://issues.apache.org/jira/browse/KAFKA-6608
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Blocker
>  Labels: kip
>
> In KAFKA-4879, Kafka Consumer hangs indefinitely due to Fetcher's {{timeout}} 
> being set to {{Long.MAX_VALUE}}. While fixing this issue, it was pointed out 
> that if a timeout was added to methods which commits offsets synchronously, a 
> stricter control on time could be achieved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6106.

Resolution: Fixed

> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: newbie++
> Fix For: 1.2.0, 1.1.1
>
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6106:
---
Fix Version/s: 1.2.0
   1.1.1

> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: newbie++
> Fix For: 1.2.0, 1.1.1
>
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401033#comment-16401033
 ] 

ASF GitHub Bot commented on KAFKA-6106:
---

mjsax closed pull request #4651: KAFKA-6106: Postpone normal processing of 
tasks until restoration of all tasks completed
URL: https://github.com/apache/kafka/pull/4651
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index c806bfde47e..92045713146 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -73,27 +73,12 @@ void addNewTask(final T task) {
 created.put(task.id(), task);
 }
 
-Set uninitializedPartitions() {
-if (created.isEmpty()) {
-return Collections.emptySet();
-}
-final Set partitions = new HashSet<>();
-for (final Map.Entry entry : created.entrySet()) {
-if (entry.getValue().hasStateStores()) {
-partitions.addAll(entry.getValue().partitions());
-}
-}
-return partitions;
-}
-
 /**
- * @return partitions that are ready to be resumed
  * @throws IllegalStateException If store gets registered after 
initialized is already finished
  * @throws StreamsException if the store's change log does not contain the 
partition
  * @throws TaskMigratedException if the task producer got fenced (EOS only)
  */
-Set initializeNewTasks() {
-final Set readyPartitions = new HashSet<>();
+void initializeNewTasks() {
 if (!created.isEmpty()) {
 log.debug("Initializing {}s {}", taskTypeName, created.keySet());
 }
@@ -104,7 +89,7 @@ void addNewTask(final T task) {
 log.debug("Transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
 addToRestoring(entry.getValue());
 } else {
-transitionToRunning(entry.getValue(), readyPartitions);
+transitionToRunning(entry.getValue());
 }
 it.remove();
 } catch (final LockException e) {
@@ -112,21 +97,19 @@ void addNewTask(final T task) {
 log.trace("Could not create {} {} due to {}; will retry", 
taskTypeName, entry.getKey(), e.getMessage());
 }
 }
-return readyPartitions;
 }
 
-Set updateRestored(final Collection 
restored) {
+void updateRestored(final Collection restored) {
 if (restored.isEmpty()) {
-return Collections.emptySet();
+return;
 }
 log.trace("{} changelog partitions that have completed restoring so 
far: {}", taskTypeName, restored);
-final Set resume = new HashSet<>();
 restoredPartitions.addAll(restored);
 for (final Iterator> it = 
restoring.entrySet().iterator(); it.hasNext(); ) {
 final Map.Entry entry = it.next();
 final T task = entry.getValue();
 if (restoredPartitions.containsAll(task.changelogPartitions())) {
-transitionToRunning(task, resume);
+transitionToRunning(task);
 it.remove();
 log.trace("{} {} completed restoration as all its changelog 
partitions {} have been applied to restore state",
 taskTypeName,
@@ -146,7 +129,6 @@ void addNewTask(final T task) {
 if (allTasksRunning()) {
 restoredPartitions.clear();
 }
-return resume;
 }
 
 boolean allTasksRunning() {
@@ -243,7 +225,7 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final 
Set
 suspended.remove(taskId);
 task.resume();
 try {
-transitionToRunning(task, new HashSet());
+transitionToRunning(task);
 } catch (final TaskMigratedException e) {
 // we need to catch migration exception internally since 
this function
 // is triggered in the rebalance callback
@@ -278,15 +260,12 @@ private void addToRestoring(final T task) {
 /**
  * @throws TaskMigratedException if the task producer got fenced (EOS only)
  */
-private void transitionToRunning(final T task, final Set 
readyPartitions) {
+private void transitionToRunning(final T task) {
 log.debug("transitioning {} {} to 

[jira] [Assigned] (KAFKA-6664) KIP-269 Substitution Within Configuration Values

2018-03-15 Thread Ron Dagostino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino reassigned KAFKA-6664:


Assignee: Ron Dagostino

> KIP-269 Substitution Within Configuration Values
> 
>
> Key: KAFKA-6664
> URL: https://issues.apache.org/jira/browse/KAFKA-6664
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
>
> KIP 269 (Substitution Within Configuration Values) proposes adding support 
> for substitution within client JAAS configuration values for PLAIN and 
> SCRAM-related SASL mechanisms in a backwards-compatible manner and making the 
> functionality available to other existing (or future) configuration contexts 
> where it is deemed appropriate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6562) KIP-255: OAuth Authentication via SASL/OAUTHBEARER

2018-03-15 Thread Ron Dagostino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino reassigned KAFKA-6562:


Assignee: Ron Dagostino

> KIP-255: OAuth Authentication via SASL/OAUTHBEARER
> --
>
> Key: KAFKA-6562
> URL: https://issues.apache.org/jira/browse/KAFKA-6562
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
>
> KIP-255: OAuth Authentication via SASL/OAUTHBEARER 
> (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75968876) 
> proposes adding the ability to authenticate to Kafka with OAuth 2 bearer 
> tokens using the OAUTHBEARER SASL mechanism.  Token retrieval and token 
> validation are both pluggable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6665) LeaderChangeListener.handleDataDeleted deadlock

2018-03-15 Thread Dmitry Konstantinov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Konstantinov updated KAFKA-6665:
---
Attachment: thread_dump.txt

> LeaderChangeListener.handleDataDeleted deadlock
> ---
>
> Key: KAFKA-6665
> URL: https://issues.apache.org/jira/browse/KAFKA-6665
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: Linux 3.10.0-327.10.1.el7.x86_64 #1 SMP Sat Jan 23 
> 04:54:55 EST 2016 x86_64 x86_64 x86_64 GNU/Linux
> java version "1.8.0_131"
> Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
> Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
>Reporter: Dmitry Konstantinov
>Priority: Major
> Attachments: thread_dump.txt
>
>
> Leader election logic may cause the following deadlock:
> ZkClient-EventThread triggers kafka.utils.KafkaScheduler.shutdown under a 
> global lock(kafka.utils.CoreUtils.inLock() on 
> controllerContext.controllerLock):
> {code:java}
> "ZkClient-EventThread-20-cube:2181" #20 daemon prio=5 os_prio=0 
> tid=0x7f656cb9c000 nid=0x4f1 waiting on condition [0x7f652df28000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xd437a898> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at 
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
> at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:98)
> at 
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:373)
> at 
> kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:168)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:145)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>  {code}
> The shutdown awaits for tasks termination with a huge timeout under the lock: 
> [https://github.com/apache/kafka/blob/0.10.1.0/core/src/main/scala/kafka/utils/KafkaScheduler.scala#L98]
> {code:java}
> cachedExecutor.awaitTermination(1, TimeUnit.DAYS)
> {code}
> Tasks within the stopping scheduler thread pool tries to get the same global 
> lock and blocked, so the pool is not terminating:
> {code:java}
> "kafka-scheduler-293" #1249 daemon prio=5 os_prio=0 tid=0x7f64d8054800 
> nid=0x7c72 waiting on condition [0x7f6406ce3000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0xc84f7ef8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:232)
> at 
> kafka.controller.KafkaController.isActive(KafkaController.scala:400)
> at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1178)
> at 
> kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:347)
> at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at 

[jira] [Created] (KAFKA-6665) LeaderChangeListener.handleDataDeleted deadlock

2018-03-15 Thread Dmitry Konstantinov (JIRA)
Dmitry Konstantinov created KAFKA-6665:
--

 Summary: LeaderChangeListener.handleDataDeleted deadlock
 Key: KAFKA-6665
 URL: https://issues.apache.org/jira/browse/KAFKA-6665
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.1.0
 Environment: Linux 3.10.0-327.10.1.el7.x86_64 #1 SMP Sat Jan 23 
04:54:55 EST 2016 x86_64 x86_64 x86_64 GNU/Linux

java version "1.8.0_131"
Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)

Reporter: Dmitry Konstantinov


Leader election logic may cause the following deadlock:

ZkClient-EventThread triggers kafka.utils.KafkaScheduler.shutdown under a 
global lock(kafka.utils.CoreUtils.inLock() on controllerContext.controllerLock):
{code:java}
"ZkClient-EventThread-20-cube:2181" #20 daemon prio=5 os_prio=0 
tid=0x7f656cb9c000 nid=0x4f1 waiting on condition [0x7f652df28000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xd437a898> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1465)
at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:98)
at 
kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:373)
at 
kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:168)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:145)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at 
kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:824)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
 {code}
The shutdown awaits for tasks termination with a huge timeout under the lock: 
[https://github.com/apache/kafka/blob/0.10.1.0/core/src/main/scala/kafka/utils/KafkaScheduler.scala#L98]
{code:java}
cachedExecutor.awaitTermination(1, TimeUnit.DAYS)
{code}
Tasks within the stopping scheduler thread pool tries to get the same global 
lock and blocked, so the pool is not terminating:
{code:java}
"kafka-scheduler-293" #1249 daemon prio=5 os_prio=0 tid=0x7f64d8054800 
nid=0x7c72 waiting on condition [0x7f6406ce3000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xc84f7ef8> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:232)
at kafka.controller.KafkaController.isActive(KafkaController.scala:400)
at 
kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1178)
at 
kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:347)
at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 

[jira] [Commented] (KAFKA-6660) Monitoring number of messages expired due to retention policy

2018-03-15 Thread Matt Garbis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400952#comment-16400952
 ] 

Matt Garbis commented on KAFKA-6660:


[~mjsax] – could you (or someone else) please review this when you have a 
chance? Thanks!!

> Monitoring number of messages expired due to retention policy
> -
>
> Key: KAFKA-6660
> URL: https://issues.apache.org/jira/browse/KAFKA-6660
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Matt Garbis
>Priority: Major
>
> I have not been able to find this out, but is there a way to monitor how many 
> messages were expired based on the retention policy? If not, JMX metrics like 
> this would be very useful for my team. I would like to be able to filter this 
> by topic and broker. Something like:
> {code:java}
> kafka.server:type=BrokerTopicMetrics,name=MessagesExpiredPerSec{code}
>  
> Additionally taking this one step further, it would be cool to be able to 
> monitor how many messages a consumer group did not consume before they were 
> expired.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6663) Expression for GlobalKTable is not correct

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6663:
---
Component/s: streams

> Expression for GlobalKTable is not correct
> --
>
> Key: KAFKA-6663
> URL: https://issues.apache.org/jira/browse/KAFKA-6663
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: huxihx
>Priority: Minor
>  Labels: documentation
>
> In [this stream doc 
> section|https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api#creating-source-streams-from-kafka],
>   when reading records from Kafka to a global KTable, the doc says:
> `In the case of a GlobalKTable, the local GlobalKTable instance of every 
> application instance will be populated with data from only a *subset* of the 
> partitions of the input topic. Collectively, across all application 
> instances, all input topic partitions are read and processed.`
> Is it correct? Each GlobalKTable instance only get assigned with a subset of 
> the partitions of the input topic? I remember it should be able to consume 
> all the partitions of the input topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6663) Expression for GlobalKTable is not correct

2018-03-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400914#comment-16400914
 ] 

Matthias J. Sax commented on KAFKA-6663:


You are right. It should be `all` partitions.

> Expression for GlobalKTable is not correct
> --
>
> Key: KAFKA-6663
> URL: https://issues.apache.org/jira/browse/KAFKA-6663
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: huxihx
>Priority: Minor
>  Labels: documentation
>
> In [this stream doc 
> section|https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api#creating-source-streams-from-kafka],
>   when reading records from Kafka to a global KTable, the doc says:
> `In the case of a GlobalKTable, the local GlobalKTable instance of every 
> application instance will be populated with data from only a *subset* of the 
> partitions of the input topic. Collectively, across all application 
> instances, all input topic partitions are read and processed.`
> Is it correct? Each GlobalKTable instance only get assigned with a subset of 
> the partitions of the input topic? I remember it should be able to consume 
> all the partitions of the input topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-03-15 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400912#comment-16400912
 ] 

John Roesler commented on KAFKA-6474:
-

Hi Filipe,

I hope you're doing well.

I'm trying to figure out why this is a problem for me but not for you... can 
you share the exact command you're using to run the tests?

Thanks,

-John

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
>  Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean (Windows OS)

2018-03-15 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400900#comment-16400900
 ] 

Guozhang Wang commented on KAFKA-6647:
--

[~mjsax] [~gbloggs] As explained in the link I posted previously, there is a 
difference how file systems handle deletes on Linux v.s. Windows. Cross-copying 
here:

{code}
Unix deletes the file name immediately, while Windows deletes the file name 
only when the last handle is closed. It however, prevents you from opening a 
file with the same name until the last handle to the (deleted) file is closed.
{code}

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean 
> (Windows OS)
> -
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Priority: Minor
>  Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6664) KIP-269 Substitution Within Configuration Values

2018-03-15 Thread Ron Dagostino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino updated KAFKA-6664:
-
Description: KIP 269 (Substitution Within Configuration Values) proposes 
adding support for substitution within client JAAS configuration values for 
PLAIN and SCRAM-related SASL mechanisms in a backwards-compatible manner and 
making the functionality available to other existing (or future) configuration 
contexts where it is deemed appropriate.  (was: KIP 270 (Substitution Within 
Configuration Values) proposes adding support for substitution within client 
JAAS configuration values for PLAIN and SCRAM-related SASL mechanisms in a 
backwards-compatible manner and making the functionality available to other 
existing (or future) configuration contexts where it is deemed appropriate.)
Summary: KIP-269 Substitution Within Configuration Values  (was: 
KIP-270 Substitution Within Configuration Values)

> KIP-269 Substitution Within Configuration Values
> 
>
> Key: KAFKA-6664
> URL: https://issues.apache.org/jira/browse/KAFKA-6664
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Ron Dagostino
>Priority: Major
>
> KIP 269 (Substitution Within Configuration Values) proposes adding support 
> for substitution within client JAAS configuration values for PLAIN and 
> SCRAM-related SASL mechanisms in a backwards-compatible manner and making the 
> functionality available to other existing (or future) configuration contexts 
> where it is deemed appropriate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6664) KIP-270 Substitution Within Configuration Values

2018-03-15 Thread Ron Dagostino (JIRA)
Ron Dagostino created KAFKA-6664:


 Summary: KIP-270 Substitution Within Configuration Values
 Key: KAFKA-6664
 URL: https://issues.apache.org/jira/browse/KAFKA-6664
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Ron Dagostino


KIP 270 (Substitution Within Configuration Values) proposes adding support for 
substitution within client JAAS configuration values for PLAIN and 
SCRAM-related SASL mechanisms in a backwards-compatible manner and making the 
functionality available to other existing (or future) configuration contexts 
where it is deemed appropriate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean (Windows OS)

2018-03-15 Thread George Bloggs (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400819#comment-16400819
 ] 

George Bloggs commented on KAFKA-6647:
--

I will check this in the morning and update this issue. The SO does look like 
it describes what I was seeing, but unable to test this this evening as I don't 
have access to the code.

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean 
> (Windows OS)
> -
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Priority: Minor
>  Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6656) Use non-zero status code when kafka-configs.sh fails

2018-03-15 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6656.

   Resolution: Fixed
Fix Version/s: 1.2.0

> Use non-zero status code when kafka-configs.sh fails
> 
>
> Key: KAFKA-6656
> URL: https://issues.apache.org/jira/browse/KAFKA-6656
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 1.2.0
>
>
> Currently we return status 0 from kafka-configs.sh even if the command raises 
> an error. It would be better to use a non-zero status code so that it can be 
> scripted more easily



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6656) Use non-zero status code when kafka-configs.sh fails

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400785#comment-16400785
 ] 

ASF GitHub Bot commented on KAFKA-6656:
---

hachikuji closed pull request #4711: KAFKA-6656; Config tool should return 
non-zero status code on failure
URL: https://github.com/apache/kafka/pull/4711
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index ddf6dcd4d3e..044be6a5ba2 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -25,11 +25,11 @@ import kafka.common.Config
 import kafka.common.InvalidConfigException
 import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
-import kafka.utils.CommandLineUtils
+import kafka.utils.{CommandLineUtils, Exit}
 import kafka.utils.Implicits._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AlterConfigsOptions, Config => JConfig, 
ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient}
+import org.apache.kafka.clients.admin.{AlterConfigsOptions, ConfigEntry, 
DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram._
@@ -65,35 +65,43 @@ object ConfigCommand extends Config {
 DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp)
 
   def main(args: Array[String]): Unit = {
+try {
+  val opts = new ConfigCommandOptions(args)
 
-val opts = new ConfigCommandOptions(args)
-
-if(args.length == 0)
-  CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config 
for a topic, client, user or broker")
-
-opts.checkArgs()
-
-val time = Time.SYSTEM
+  if (args.length == 0)
+CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity 
config for a topic, client, user or broker")
 
-if (opts.options.has(opts.zkConnectOpt)) {
-  val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), 
JaasUtils.isZkSecurityEnabled, 3, 3,
-Int.MaxValue, time)
-  val adminZkClient = new AdminZkClient(zkClient)
+  opts.checkArgs()
 
-  try {
-if (opts.options.has(opts.alterOpt))
-  alterConfig(zkClient, opts, adminZkClient)
-else if (opts.options.has(opts.describeOpt))
-  describeConfig(zkClient, opts, adminZkClient)
-  } catch {
-case e: Throwable =>
-  println("Error while executing config command " + e.getMessage)
-  println(Utils.stackTrace(e))
-  } finally {
-zkClient.close()
+  if (opts.options.has(opts.zkConnectOpt)) {
+processCommandWithZk(opts.options.valueOf(opts.zkConnectOpt), opts)
+  } else {
+processBrokerConfig(opts)
   }
-} else {
-  processBrokerConfig(opts)
+} catch {
+  case e @ (_: IllegalArgumentException | _: InvalidConfigException | _: 
OptionException) =>
+logger.debug(s"Failed config command with args $args", e)
+System.err.println(e.getMessage)
+Exit.exit(1)
+
+  case t: Throwable =>
+System.err.println(s"Error while executing config command with args 
$args")
+t.printStackTrace(System.err)
+Exit.exit(1)
+}
+  }
+
+  private def processCommandWithZk(zkConnectString: String, opts: 
ConfigCommandOptions): Unit = {
+val zkClient = KafkaZkClient(zkConnectString, 
JaasUtils.isZkSecurityEnabled, 3, 3,
+  Int.MaxValue, Time.SYSTEM)
+val adminZkClient = new AdminZkClient(zkClient)
+try {
+  if (opts.options.has(opts.alterOpt))
+alterConfig(zkClient, opts, adminZkClient)
+  else if (opts.options.has(opts.describeOpt))
+describeConfig(zkClient, opts, adminZkClient)
+} finally {
+  zkClient.close()
 }
   }
 
@@ -217,14 +225,9 @@ object ConfigCommand extends Config {
 alterBrokerConfig(adminClient, opts, entityName)
   else if (opts.options.has(opts.describeOpt))
 describeBrokerConfig(adminClient, opts, entityName)
-} catch {
-  case e: Throwable =>
-println("Error while executing config command " + e.getMessage)
-println(Utils.stackTrace(e))
 } finally {
   adminClient.close()
 }
-
   }
 
   private[admin] def alterBrokerConfig(adminClient: JAdminClient, opts: 
ConfigCommandOptions, entityName: String) {
diff --git 

[jira] [Updated] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean (Windows OS)

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6647:
---
Summary: KafkaStreams.cleanUp creates .lock file in directory its trying to 
clean (Windows OS)  (was: KafkaStreams.cleanUp creates .lock file in directory 
its trying to clean)

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean 
> (Windows OS)
> -
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Priority: Minor
>  Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6655) CleanupThread: Failed to lock the state directory due to an unexpected exception (Windows OS)

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6655:
---
Description: 
This issue happens on Windows OS. It code works fine on Linux. This ticket is 
related to KAFKA-6647, that also reports locking issues on Windows. However, 
there, the issue occurs if users calls KafkaStreams#cleanUp() explicitly, while 
this ticket is related to KafkaStreams background CleanupThread (note, that 
both use `StateDirectory.cleanRemovedTasks`, but behavior is still slightly 
different as different parameters are passed into the method).
{quote}[CleanupThread] Failed to lock the state directory due to an unexpected 
exceptionjava.nio.file.DirectoryNotEmptyException: 
\tmp\kafka-streams\srini-20171208\0_9
 at 
sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
 at 
sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
 at java.nio.file.Files.delete(Files.java:1126)
 at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:636)
 at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:619)
 at java.nio.file.Files.walkFileTree(Files.java:2688)
 at java.nio.file.Files.walkFileTree(Files.java:2742)
 at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
 at 
org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:245)
 at org.apache.kafka.streams.KafkaStreams$3.run(KafkaStreams.java:761)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
{quote}

  was:
CleanupThread] Failed to lock the state directory due to an unexpected exception

java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\srini-20171208\0_9
 at 
sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
 at 
sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
 at java.nio.file.Files.delete(Files.java:1126)
 at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:636)
 at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:619)
 at java.nio.file.Files.walkFileTree(Files.java:2688)
 at java.nio.file.Files.walkFileTree(Files.java:2742)
 at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
 at 
org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:245)
 at org.apache.kafka.streams.KafkaStreams$3.run(KafkaStreams.java:761)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)


> CleanupThread: Failed to lock the state directory due to an unexpected 
> exception (Windows OS)
> -
>
> Key: KAFKA-6655
> URL: https://issues.apache.org/jira/browse/KAFKA-6655
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: Windows Operating System
>Reporter: Srini
>Priority: Major
>
> This issue happens on Windows OS. It code works fine on Linux. This ticket is 
> related to KAFKA-6647, that also reports locking issues on Windows. However, 
> there, the issue occurs if users calls KafkaStreams#cleanUp() explicitly, 
> while this ticket is related to KafkaStreams background CleanupThread (note, 
> that both use `StateDirectory.cleanRemovedTasks`, but behavior is still 
> slightly different as different parameters are passed into the method).
> {quote}[CleanupThread] Failed to lock the state directory due to an 
> unexpected exceptionjava.nio.file.DirectoryNotEmptyException: 
> \tmp\kafka-streams\srini-20171208\0_9
>  at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
>  at 
> 

[jira] [Reopened] (KAFKA-6655) CleanupThread] Failed to lock the state directory due to an unexpected exception

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-6655:


> CleanupThread] Failed to lock the state directory due to an unexpected 
> exception
> 
>
> Key: KAFKA-6655
> URL: https://issues.apache.org/jira/browse/KAFKA-6655
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Srini
>Priority: Major
>
> CleanupThread] Failed to lock the state directory due to an unexpected 
> exception
> java.nio.file.DirectoryNotEmptyException: 
> \tmp\kafka-streams\srini-20171208\0_9
>  at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
>  at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>  at java.nio.file.Files.delete(Files.java:1126)
>  at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:636)
>  at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:619)
>  at java.nio.file.Files.walkFileTree(Files.java:2688)
>  at java.nio.file.Files.walkFileTree(Files.java:2742)
>  at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>  at 
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:245)
>  at org.apache.kafka.streams.KafkaStreams$3.run(KafkaStreams.java:761)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6655) CleanupThread: Failed to lock the state directory due to an unexpected exception (Windows OS)

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6655:
---
Environment: Windows Operating System

> CleanupThread: Failed to lock the state directory due to an unexpected 
> exception (Windows OS)
> -
>
> Key: KAFKA-6655
> URL: https://issues.apache.org/jira/browse/KAFKA-6655
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: Windows Operating System
>Reporter: Srini
>Priority: Major
>
> CleanupThread] Failed to lock the state directory due to an unexpected 
> exception
> java.nio.file.DirectoryNotEmptyException: 
> \tmp\kafka-streams\srini-20171208\0_9
>  at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
>  at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>  at java.nio.file.Files.delete(Files.java:1126)
>  at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:636)
>  at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:619)
>  at java.nio.file.Files.walkFileTree(Files.java:2688)
>  at java.nio.file.Files.walkFileTree(Files.java:2742)
>  at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>  at 
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:245)
>  at org.apache.kafka.streams.KafkaStreams$3.run(KafkaStreams.java:761)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6655) CleanupThread: Failed to lock the state directory due to an unexpected exception (Windows OS)

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6655:
---
Summary: CleanupThread: Failed to lock the state directory due to an 
unexpected exception (Windows OS)  (was: CleanupThread] Failed to lock the 
state directory due to an unexpected exception)

> CleanupThread: Failed to lock the state directory due to an unexpected 
> exception (Windows OS)
> -
>
> Key: KAFKA-6655
> URL: https://issues.apache.org/jira/browse/KAFKA-6655
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Srini
>Priority: Major
>
> CleanupThread] Failed to lock the state directory due to an unexpected 
> exception
> java.nio.file.DirectoryNotEmptyException: 
> \tmp\kafka-streams\srini-20171208\0_9
>  at 
> sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
>  at 
> sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
>  at java.nio.file.Files.delete(Files.java:1126)
>  at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:636)
>  at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:619)
>  at java.nio.file.Files.walkFileTree(Files.java:2688)
>  at java.nio.file.Files.walkFileTree(Files.java:2742)
>  at org.apache.kafka.common.utils.Utils.delete(Utils.java:619)
>  at 
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:245)
>  at org.apache.kafka.streams.KafkaStreams$3.run(KafkaStreams.java:761)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4392) Failed to lock the state directory due to an unexpected exception

2018-03-15 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400735#comment-16400735
 ] 

Matthias J. Sax commented on KAFKA-4392:


https://issues.apache.org/jira/browse/KAFKA-6647 Seems to be a related issues. 
I also did a PR that includes some improvements. Atm, we are not able to 
reproduce the issues though. It seems to be related to different behavior of 
Linux and Windows (unfortunately, all our testing is done using Linux machines 
atm) so it's hard to detect if Linux/Windows behaves differently. In order to 
keep concerns separated, I'll reopen KAFKA-6655 and point out that this tickets 
relates to Windows.

> Failed to lock the state directory due to an unexpected exception
> -
>
> Key: KAFKA-4392
> URL: https://issues.apache.org/jira/browse/KAFKA-4392
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Ara Ebrahimi
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 0.10.2.0
>
>
> This happened on streaming startup, on a clean installation, no existing 
> folder. Here I was starting 4 instances of our streaming app on 4 machines 
> and one threw this exception. Seems to me there’s a race condition somewhere 
> when instances discover others, or something like that.
> 2016-11-02 15:43:47 INFO  StreamRunner:59 - Started http server successfully.
> 2016-11-02 15:44:50 ERROR StateDirectory:147 - Failed to lock the state 
> directory due to an unexpected exception
> java.nio.file.NoSuchFileException: 
> /data/1/kafka-streams/myapp-streams/7_21/.lock
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>   at 
> sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177)
>   at java.nio.channels.FileChannel.open(FileChannel.java:287)
>   at java.nio.channels.FileChannel.open(FileChannel.java:335)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.getOrCreateFileChannel(StateDirectory.java:176)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:90)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:140)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:552)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:459)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> ^C
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/7_21/
> ls: cannot access /data/1/kafka-streams/myapp-streams/7_21/: No such file or 
> directory
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/
> total 4
> drwxr-xr-x 74 root root 4096 Nov  2 15:44 .
> drwxr-xr-x  3 root root   27 Nov  2 15:43 ..
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_1
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_13
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_14
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_16
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_2
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_22
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_28
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_3
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_31
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_5
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_7
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_8
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_9
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_1
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_10
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_14
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_15
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_16
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_17
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_18
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_3
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_5
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_1
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_10
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_12
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_20
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_24
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_10
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_11
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_19
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_20
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_25
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_26
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_3
> drwxr-xr-x  3 root root   64 Nov  2 15:43 4_11
> drwxr-xr-x  3 root root   64 

[jira] [Commented] (KAFKA-6351) libs directory has duplicate javassist jars

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400488#comment-16400488
 ] 

ASF GitHub Bot commented on KAFKA-6351:
---

omkreddy opened a new pull request #4719: KAFKA-6351: Exclude javassist library 
to Kafka distribution from tools project
URL: https://github.com/apache/kafka/pull/4719
 
 
   -  javassist:3.21.0-GA library is coming from :connect:runtime project and 
javassist:3.20.0-GA library is coming from :tools project. This PR exclude 
copying javassist while creating Kafka distribution.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> libs directory has duplicate javassist jars
> ---
>
> Key: KAFKA-6351
> URL: https://issues.apache.org/jira/browse/KAFKA-6351
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 1.0.0
>Reporter: pre sto
>Priority: Minor
>
> Downloaded kafka_2.11-1.0.0 and noticed duplicate jars under libs
> javassist-3.20.0-GA.jar
> javassist-3.21.0-GA.jar
> I assume that's a mistake



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4392) Failed to lock the state directory due to an unexpected exception

2018-03-15 Thread Srini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400336#comment-16400336
 ] 

Srini commented on KAFKA-4392:
--

[~mjsax] Yes, I did set the path and can view the contents. In fact, I created 
/tmp as well. The path is not the issue since java respects unix style paths on 
Windows. My guess is org.apache.kafka.common.utils.Utils.delete is unable to 
delete the ".lock" or ".checkpoint" file(s).

@[~habdank] Thanks, yes I am aware. Unfortunately, that will not work  since we 
are implementing a stateful store, 24/7 four 9s service and will need the app 
to recover gracefully in case of a crash.

 

> Failed to lock the state directory due to an unexpected exception
> -
>
> Key: KAFKA-4392
> URL: https://issues.apache.org/jira/browse/KAFKA-4392
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Ara Ebrahimi
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 0.10.2.0
>
>
> This happened on streaming startup, on a clean installation, no existing 
> folder. Here I was starting 4 instances of our streaming app on 4 machines 
> and one threw this exception. Seems to me there’s a race condition somewhere 
> when instances discover others, or something like that.
> 2016-11-02 15:43:47 INFO  StreamRunner:59 - Started http server successfully.
> 2016-11-02 15:44:50 ERROR StateDirectory:147 - Failed to lock the state 
> directory due to an unexpected exception
> java.nio.file.NoSuchFileException: 
> /data/1/kafka-streams/myapp-streams/7_21/.lock
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>   at 
> sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177)
>   at java.nio.channels.FileChannel.open(FileChannel.java:287)
>   at java.nio.channels.FileChannel.open(FileChannel.java:335)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.getOrCreateFileChannel(StateDirectory.java:176)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:90)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:140)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:552)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:459)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> ^C
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/7_21/
> ls: cannot access /data/1/kafka-streams/myapp-streams/7_21/: No such file or 
> directory
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/
> total 4
> drwxr-xr-x 74 root root 4096 Nov  2 15:44 .
> drwxr-xr-x  3 root root   27 Nov  2 15:43 ..
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_1
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_13
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_14
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_16
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_2
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_22
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_28
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_3
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_31
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_5
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_7
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_8
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_9
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_1
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_10
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_14
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_15
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_16
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_17
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_18
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_3
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_5
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_1
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_10
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_12
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_20
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_24
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_10
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_11
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_19
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_20
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_25
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_26
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_3
> drwxr-xr-x  3 root root   64 Nov  2 15:43 4_11
> drwxr-xr-x  3 root root   64 Nov  2 15:43 4_12
> 

[jira] [Created] (KAFKA-6663) Expression for GlobalKTable is not correct

2018-03-15 Thread huxihx (JIRA)
huxihx created KAFKA-6663:
-

 Summary: Expression for GlobalKTable is not correct
 Key: KAFKA-6663
 URL: https://issues.apache.org/jira/browse/KAFKA-6663
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: huxihx


In [this stream doc 
section|https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api#creating-source-streams-from-kafka],
  when reading records from Kafka to a global KTable, the doc says:
`In the case of a GlobalKTable, the local GlobalKTable instance of every 
application instance will be populated with data from only a *subset* of the 
partitions of the input topic. Collectively, across all application instances, 
all input topic partitions are read and processed.`

Is it correct? Each GlobalKTable instance only get assigned with a subset of 
the partitions of the input topic? I remember it should be able to consume all 
the partitions of the input topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean

2018-03-15 Thread George Bloggs (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400125#comment-16400125
 ] 

George Bloggs commented on KAFKA-6647:
--

As mentioned earlier, in our code I have implemented the following and call 
this directly before kafkaStreams.start(); It works and clears the directory 
which highlights the issue is within the lock functionality :

{code:java}
 private void ourCleanUp() {
final File baseDir = new 
File(streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG));
final File stateDir = new File(baseDir, 
streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
try {
Utils.delete(stateDir);
} catch (IOException e) {
LOGGER.error("Arrggh!! ourCleanUp failed!", e);
}
}
{code}


> KafkaStreams.cleanUp creates .lock file in directory its trying to clean
> 
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Priority: Minor
>  Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4392) Failed to lock the state directory due to an unexpected exception

2018-03-15 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400055#comment-16400055
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-4392:
---

I see it on my Windows workstation as well.

The workaround is before kafka start to clear all files, which are stored in 
this folder like _/data/1/kafka-streams/myapp-streams/_ .

This is only workaround, because if the service is really working like 24/7 
then it will loos complete state and messages might be lost :-(.


> Failed to lock the state directory due to an unexpected exception
> -
>
> Key: KAFKA-4392
> URL: https://issues.apache.org/jira/browse/KAFKA-4392
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Ara Ebrahimi
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 0.10.2.0
>
>
> This happened on streaming startup, on a clean installation, no existing 
> folder. Here I was starting 4 instances of our streaming app on 4 machines 
> and one threw this exception. Seems to me there’s a race condition somewhere 
> when instances discover others, or something like that.
> 2016-11-02 15:43:47 INFO  StreamRunner:59 - Started http server successfully.
> 2016-11-02 15:44:50 ERROR StateDirectory:147 - Failed to lock the state 
> directory due to an unexpected exception
> java.nio.file.NoSuchFileException: 
> /data/1/kafka-streams/myapp-streams/7_21/.lock
>   at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>   at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>   at 
> sun.nio.fs.UnixFileSystemProvider.newFileChannel(UnixFileSystemProvider.java:177)
>   at java.nio.channels.FileChannel.open(FileChannel.java:287)
>   at java.nio.channels.FileChannel.open(FileChannel.java:335)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.getOrCreateFileChannel(StateDirectory.java:176)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:90)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:140)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:552)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:459)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> ^C
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/7_21/
> ls: cannot access /data/1/kafka-streams/myapp-streams/7_21/: No such file or 
> directory
> [arae@a4 ~]$ ls -al /data/1/kafka-streams/myapp-streams/
> total 4
> drwxr-xr-x 74 root root 4096 Nov  2 15:44 .
> drwxr-xr-x  3 root root   27 Nov  2 15:43 ..
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_1
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_13
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_14
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_16
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_2
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_22
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_28
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_3
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_31
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_5
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_7
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_8
> drwxr-xr-x  3 root root   32 Nov  2 15:43 0_9
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_1
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_10
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_14
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_15
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_16
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_17
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_18
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_3
> drwxr-xr-x  3 root root   32 Nov  2 15:43 1_5
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_1
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_10
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_12
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_20
> drwxr-xr-x  3 root root   60 Nov  2 15:43 2_24
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_10
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_11
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_19
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_20
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_25
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_26
> drwxr-xr-x  3 root root   61 Nov  2 15:43 3_3
> drwxr-xr-x  3 root root   64 Nov  2 15:43 4_11
> drwxr-xr-x  3 root root   64 Nov  2 15:43 4_12
> drwxr-xr-x  3 root root   64 Nov  2 15:43 4_18
> drwxr-xr-x  3 root root   64 Nov  2 15:43 4_19
> drwxr-xr-x  3 root root   

[jira] [Resolved] (KAFKA-6653) Delayed operations may not be completed when there is lock contention

2018-03-15 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6653.

   Resolution: Fixed
Fix Version/s: 1.1.0

> Delayed operations may not be completed when there is lock contention
> -
>
> Key: KAFKA-6653
> URL: https://issues.apache.org/jira/browse/KAFKA-6653
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.0
>
>
> If there is lock contention while multiple threads check if a delayed 
> operation may be completed (e.g. a produce request with acks=-1), only the 
> thread that acquires the lock without blocking attempts to complete the 
> operation. This change was made to avoid deadlocks under KAFKA-5970. But this 
> leaves a timing window when an operation becomes ready to complete after 
> another thread has acquired the lock and performed the check for completion, 
> but not yet released the lock. In this case, the operation may never be 
> completed and will timeout unless there are other operations with the same 
> key. The timeout was observed in a failed system test where a produce request 
> timed out, causing the test failure.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6653) Delayed operations may not be completed when there is lock contention

2018-03-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16400019#comment-16400019
 ] 

ASF GitHub Bot commented on KAFKA-6653:
---

hachikuji closed pull request #4704: KAFKA-6653: Complete delayed operations 
even when there is lock contention
URL: https://github.com/apache/kafka/pull/4704
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 894d30e2d7c..2a096e1a811 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -47,6 +47,7 @@ abstract class DelayedOperation(override val delayMs: Long,
 lockOpt: Option[Lock] = None) extends TimerTask with Logging {
 
   private val completed = new AtomicBoolean(false)
+  private val tryCompletePending = new AtomicBoolean(false)
   // Visible for testing
   private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
 
@@ -101,16 +102,38 @@ abstract class DelayedOperation(override val delayMs: 
Long,
   /**
* Thread-safe variant of tryComplete() that attempts completion only if the 
lock can be acquired
* without blocking.
+   *
+   * If threadA acquires the lock and performs the check for completion before 
completion criteria is met
+   * and threadB satisfies the completion criteria, but fails to acquire the 
lock because threadA has not
+   * yet released the lock, we need to ensure that completion is attempted 
again without blocking threadA
+   * or threadB. `tryCompletePending` is set by threadB when it fails to 
acquire the lock and at least one
+   * of threadA or threadB will attempt completion of the operation if this 
flag is set. This ensures that
+   * every invocation of `maybeTryComplete` is followed by at least one 
invocation of `tryComplete` until
+   * the operation is actually completed.
*/
   private[server] def maybeTryComplete(): Boolean = {
-if (lock.tryLock()) {
-  try {
-tryComplete()
-  } finally {
-lock.unlock()
+var retry = false
+var done = false
+do {
+  if (lock.tryLock()) {
+try {
+  tryCompletePending.set(false)
+  done = tryComplete()
+} finally {
+  lock.unlock()
+}
+// While we were holding the lock, another thread may have invoked 
`maybeTryComplete` and set
+// `tryCompletePending`. In this case we should retry.
+retry = tryCompletePending.get()
+  } else {
+// Another thread is holding the lock. If `tryCompletePending` is 
already set and this thread failed to
+// acquire the lock, then the thread that is holding the lock is 
guaranteed to see the flag and retry.
+// Otherwise, we should set the flag and retry on this thread since 
the thread holding the lock may have
+// released the lock and returned by the time the flag is set.
+retry = !tryCompletePending.getAndSet(true)
   }
-} else
-  false
+} while (!isCompleted && retry)
+done
   }
 
   /*
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index d4d79e554c7..3b077a0b438 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -17,11 +17,13 @@
 
 package kafka.server
 
-import java.util.concurrent.{Executors, Future}
+import java.util.Random
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantLock
 
 import kafka.utils.CoreUtils.inLock
-
+import kafka.utils.TestUtils
 import org.apache.kafka.common.utils.Time
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
@@ -29,6 +31,7 @@ import org.junit.Assert._
 class DelayedOperationTest {
 
   var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null
+  var executorService: ExecutorService = null
 
   @Before
   def setUp() {
@@ -38,6 +41,8 @@ class DelayedOperationTest {
   @After
   def tearDown() {
 purgatory.shutdown()
+if (executorService != null)
+  executorService.shutdown()
   }
 
   @Test
@@ -122,6 +127,94 @@ class DelayedOperationTest {
 assertEquals(Nil, cancelledOperations)
   }
 
+  /**
+* Verify that if there is lock contention between two threads attempting 
to complete,
+* completion is performed without any blocking in either thread.
+*/
+  @Test
+  def testTryCompleteLockContention(): Unit = {
+executorService = Executors.newSingleThreadExecutor()
+

[jira] [Issue Comment Deleted] (KAFKA-6662) Consumer use offsetsForTimes() get offset return None.

2018-03-15 Thread Matt Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Wang updated KAFKA-6662:
-
Comment: was deleted

(was: the pr is:https://github.com/apache/kafka/pull/4717)

> Consumer use offsetsForTimes() get offset return None.
> --
>
> Key: KAFKA-6662
> URL: https://issues.apache.org/jira/browse/KAFKA-6662
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0
>Reporter: Matt Wang
>Priority: Minor
>
> When we use Consumer's method  offsetsForTimes()  to get the topic-partition 
> offset, sometimes it will return null. Print the client log
> {code:java}
> // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop 
> interval 256 upload 0 retry 0 fail 0 
> (com.meituan.mtrace.collector.sg.AbstractCollector)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. 
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: 
> (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 
> to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 
> [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 
> [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 
> [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 
> [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
> DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
> CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for 
> org.matt_test2-0. Fetched offset -1, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.Fetcher){code}
> From the log, we find broker return the offset, but it's value is -1, this 
> value will be removed in Fetcher.handleListOffsetResponse(),
> {code:java}
> // // Handle v1 and later response
> log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, 
> timestamp {}",
> topicPartition, partitionData.offset, partitionData.timestamp);
> if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
> OffsetData offsetData = new OffsetData(partitionData.offset, 
> partitionData.timestamp);
> timestampOffsetMap.put(topicPartition, offsetData);
> }{code}
> We test several situations, and we found that in the following two cases it 
> will return none.
>  # The topic-partition msg number is 0, when we use offsetsForTimes() to get 
> the offset, the offset will retuan -1;
>  #  The targetTime we use to find offset is larger than the partition 
> active_segment's largestTimestamp, the offset will return -1;
> If the offset is set -1, it will not be return to consumer client. I think in 
> these situation, it should be return the latest offset, and it's also defined 
> in kafka/core annotation.
> {code:java}
> // /**
>  * Search the message offset based on timestamp.
>  * This method returns an option of TimestampOffset. The offset is the offset 
> of the first message whose timestamp is
>  * greater than or equals to the target timestamp.
>  *
>  * If all the message in the segment have smaller timestamps, the returned 
> offset will be last offset + 1 and the
>  * timestamp will be max timestamp in the segment.
>  *
>  * If all the messages in the segment have larger timestamps, or no message 
> in the segment has a timestamp,
>  * the returned the offset will be the base offset of the segment and the 
> timestamp will be Message.NoTimestamp.
>  *
>  * This methods only returns None when the log is not empty but we did not 
> see any messages when scanning the log
>  * from the indexed position. This could happen if the log is truncated after 
> we get the indexed position but
>  * before we scan the log from there. In this case we simply return None and 
> the caller will need to check on
>  * the truncated log and maybe retry or even do the search on another log 
> segment.
>  *
>  * @param timestamp The timestamp to search for.
>  * @return the timestamp and offset of the first message whose timestamp is 
> larger than or 

[jira] [Commented] (KAFKA-6662) Consumer use offsetsForTimes() get offset return None.

2018-03-15 Thread Matt Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399981#comment-16399981
 ] 

Matt Wang commented on KAFKA-6662:
--

the pr is:https://github.com/apache/kafka/pull/4717

> Consumer use offsetsForTimes() get offset return None.
> --
>
> Key: KAFKA-6662
> URL: https://issues.apache.org/jira/browse/KAFKA-6662
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0
>Reporter: Matt Wang
>Priority: Minor
>
> When we use Consumer's method  offsetsForTimes()  to get the topic-partition 
> offset, sometimes it will return null. Print the client log
> {code:java}
> // 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop 
> interval 256 upload 0 retry 0 fail 0 
> (com.meituan.mtrace.collector.sg.AbstractCollector)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE 
> (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
> [2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. 
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: 
> (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 
> to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 
> [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 
> [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 
> [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 
> [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
> DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
> CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
> (org.apache.kafka.clients.NetworkClient)
> [2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for 
> org.matt_test2-0. Fetched offset -1, timestamp -1 
> (org.apache.kafka.clients.consumer.internals.Fetcher){code}
> From the log, we find broker return the offset, but it's value is -1, this 
> value will be removed in Fetcher.handleListOffsetResponse(),
> {code:java}
> // // Handle v1 and later response
> log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, 
> timestamp {}",
> topicPartition, partitionData.offset, partitionData.timestamp);
> if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
> OffsetData offsetData = new OffsetData(partitionData.offset, 
> partitionData.timestamp);
> timestampOffsetMap.put(topicPartition, offsetData);
> }{code}
> We test several situations, and we found that in the following two cases it 
> will return none.
>  # The topic-partition msg number is 0, when we use offsetsForTimes() to get 
> the offset, the offset will retuan -1;
>  #  The targetTime we use to find offset is larger than the partition 
> active_segment's largestTimestamp, the offset will return -1;
> If the offset is set -1, it will not be return to consumer client. I think in 
> these situation, it should be return the latest offset, and it's also defined 
> in kafka/core annotation.
> {code:java}
> // /**
>  * Search the message offset based on timestamp.
>  * This method returns an option of TimestampOffset. The offset is the offset 
> of the first message whose timestamp is
>  * greater than or equals to the target timestamp.
>  *
>  * If all the message in the segment have smaller timestamps, the returned 
> offset will be last offset + 1 and the
>  * timestamp will be max timestamp in the segment.
>  *
>  * If all the messages in the segment have larger timestamps, or no message 
> in the segment has a timestamp,
>  * the returned the offset will be the base offset of the segment and the 
> timestamp will be Message.NoTimestamp.
>  *
>  * This methods only returns None when the log is not empty but we did not 
> see any messages when scanning the log
>  * from the indexed position. This could happen if the log is truncated after 
> we get the indexed position but
>  * before we scan the log from there. In this case we simply return None and 
> the caller will need to check on
>  * the truncated log and maybe retry or even do the search on another log 
> segment.
>  *
>  * @param timestamp The timestamp to search for.
>  * @return the timestamp and offset of the first message whose timestamp is 
> larger 

[jira] [Created] (KAFKA-6662) Consumer use offsetsForTimes() get offset return None.

2018-03-15 Thread Matt Wang (JIRA)
Matt Wang created KAFKA-6662:


 Summary: Consumer use offsetsForTimes() get offset return None.
 Key: KAFKA-6662
 URL: https://issues.apache.org/jira/browse/KAFKA-6662
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.2.0
Reporter: Matt Wang


When we use Consumer's method  offsetsForTimes()  to get the topic-partition 
offset, sometimes it will return null. Print the client log
{code:java}
// 2018-03-15 11:54:05,239] DEBUG Collector TraceCollector dispatcher loop 
interval 256 upload 0 retry 0 fail 0 
(com.meituan.mtrace.collector.sg.AbstractCollector)
[2018-03-15 11:54:05,241] DEBUG Set SASL client state to INITIAL 
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2018-03-15 11:54:05,241] DEBUG Set SASL client state to INTERMEDIATE 
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2018-03-15 11:54:05,247] DEBUG Set SASL client state to COMPLETE 
(org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2018-03-15 11:54:05,247] DEBUG Initiating API versions fetch from node 53. 
(org.apache.kafka.clients.NetworkClient)
[2018-03-15 11:54:05,253] DEBUG Recorded API versions for node 53: (Produce(0): 
0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 
1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], 
StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], 
ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2], 
OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], 
JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
(org.apache.kafka.clients.NetworkClient)
[2018-03-15 11:54:05,315] DEBUG Handling ListOffsetResponse response for 
org.matt_test2-0. Fetched offset -1, timestamp -1 
(org.apache.kafka.clients.consumer.internals.Fetcher){code}
>From the log, we find broker return the offset, but it's value is -1, this 
>value will be removed in Fetcher.handleListOffsetResponse(),
{code:java}
// // Handle v1 and later response
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, 
timestamp {}",
topicPartition, partitionData.offset, partitionData.timestamp);
if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) {
OffsetData offsetData = new OffsetData(partitionData.offset, 
partitionData.timestamp);
timestampOffsetMap.put(topicPartition, offsetData);
}{code}
We test several situations, and we found that in the following two cases it 
will return none.
 # The topic-partition msg number is 0, when we use offsetsForTimes() to get 
the offset, the offset will retuan -1;
 #  The targetTime we use to find offset is larger than the partition 
active_segment's largestTimestamp, the offset will return -1;

If the offset is set -1, it will not be return to consumer client. I think in 
these situation, it should be return the latest offset, and it's also defined 
in kafka/core annotation.
{code:java}
// /**
 * Search the message offset based on timestamp.
 * This method returns an option of TimestampOffset. The offset is the offset 
of the first message whose timestamp is
 * greater than or equals to the target timestamp.
 *
 * If all the message in the segment have smaller timestamps, the returned 
offset will be last offset + 1 and the
 * timestamp will be max timestamp in the segment.
 *
 * If all the messages in the segment have larger timestamps, or no message in 
the segment has a timestamp,
 * the returned the offset will be the base offset of the segment and the 
timestamp will be Message.NoTimestamp.
 *
 * This methods only returns None when the log is not empty but we did not see 
any messages when scanning the log
 * from the indexed position. This could happen if the log is truncated after 
we get the indexed position but
 * before we scan the log from there. In this case we simply return None and 
the caller will need to check on
 * the truncated log and maybe retry or even do the search on another log 
segment.
 *
 * @param timestamp The timestamp to search for.
 * @return the timestamp and offset of the first message whose timestamp is 
larger than or equals to the
 * target timestamp. None will be returned if there is no such message.
 */
def findOffsetByTimestamp(timestamp: Long): Option[TimestampOffset] = {
  // Get the index entry with a timestamp less than or equal to the target 
timestamp
  val timestampOffset = timeIndex.lookup(timestamp)
  val position = index.lookup(timestampOffset.offset).position
  // Search the timestamp
  log.searchForTimestamp(timestamp, 

[jira] [Updated] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6054:
---
Description: 
KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade]


We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
upgrade of the app, so that one point, there were both 0.10.0.0-based instances 
and 0.10.2.1-based instances running.

We observed the following stack trace:
{code:java}
2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo -
unable to decode subscription data: version=2
org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
subscription data: version=2
at 
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

{code}
I spoke with [~mjsax] and he said this is a known issue that happens when you 
have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, 
because the internal version number of the protocol changed when adding 
Interactive Queries. Matthias asked me to file this JIRA>

  was:
We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
upgrade of the app, so that one point, there were both 0.10.0.0-based instances 
and 0.10.2.1-based instances running. 

We observed the following stack trace:

{code}
2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo -
unable to decode subscription data: version=2
org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
subscription data: version=2
at 
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
at 

[jira] [Updated] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6054:
---
Fix Version/s: 1.2.0

> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -
>
> Key: KAFKA-6054
> URL: https://issues.apache.org/jira/browse/KAFKA-6054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: James Cheng
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: kip
> Fix For: 1.2.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade]
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running.
> We observed the following stack trace:
> {code:java}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> 
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, 
> because the internal version number of the protocol changed when adding 
> Interactive Queries. Matthias asked me to file this JIRA>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-15 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6054:
---
Labels: kip  (was: needs-kip)

> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -
>
> Key: KAFKA-6054
> URL: https://issues.apache.org/jira/browse/KAFKA-6054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: James Cheng
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: kip
> Fix For: 1.2.0
>
>
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running. 
> We observed the following stack trace:
> {code}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> 
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, 
> because the internal version number of the protocol changed when adding 
> Interactive Queries. Matthias asked me to file this JIRA>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)