[jira] [Updated] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has LIMITED retention time

2018-03-12 Thread Navinder Brar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6643:
-
Description: 
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.

I have few proposals in that respect.
1. Use a new kafka topic related to each partition which we need to warm up on 
the fly(when node containing that partition crashes. Produce into this topic 
from another replica/active and built new replica through this topic.
2. Use peer to peer file transfer as rocksdb can create backups, which can be 
transferred from source node to destination node when a new replica has to be 
built from scratch.
3. Use HDFS in intermediate instead of kafka topic where we can keep scheduled 
backups for each partition and use those to build new replicas.

  was:
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.

I have few proposals in that respect.
1. Use a new kafka topic related to each partition whi


> Warm up new replicas from scratch when changelog topic has LIMITED retention 
> time
> -
>
> Key: KAFKA-6643
> URL: https://issues.apache.org/jira/browse/KAFKA-6643
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> In the current scenario, Kafka Streams has changelog Kafka topics(internal 
> topics having all the data for the store) which are used to build the state 
> of replicas. So, if we keep the number of standby replicas as 1, we still 
> have more availability for persistent state stores as changelog Kafka topics 
> are also replicated depending upon broker replication policy but that also 
> means we are using at least 4 times the space(1 master store, 1 replica 
> store, 1 changelog, 1 changelog replica). 
> Now if we have an year's data in persistent stores(rocksdb), we don't want 
> the changelog topics to have an year's data as it will put an unnecessary 
> burden on brokers(in terms of space). If we have to scale our kafka streams 
> application(having 200-300 TB's of data) we have to scale the kafka brokers 
> as well. We want to reduce this dependency and find out ways to just use 
> changelog topic as a queue, having just 2 or 3 days of data and warm up the 
> replicas from scratch in some other way.
> I have few proposals in that respect.
> 1. Use a new kafka topic related to each partition which we need to warm up 
> on the fly(when node containing that partition crashes. Produce into this 
> topic from another replica/active and built new replica through this topic.
> 2. Use peer to peer file transfer as rocksdb can create backups, which can be 
> transferred from source node to destination node when a new replica has to be 
> built from scratch.
> 3. Use HDFS in intermediate instead of kafka topic where we can keep 
> scheduled backups for each partition and use those to build new replicas.



--
This message was sent 

[jira] [Updated] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has LIMITED retention time

2018-03-12 Thread Navinder Brar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6643:
-
Summary: Warm up new replicas from scratch when changelog topic has LIMITED 
retention time  (was: Warm up new replicas from scratch when changelog topic 
has retention time)

> Warm up new replicas from scratch when changelog topic has LIMITED retention 
> time
> -
>
> Key: KAFKA-6643
> URL: https://issues.apache.org/jira/browse/KAFKA-6643
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> In the current scenario, Kafka Streams has changelog Kafka topics(internal 
> topics having all the data for the store) which are used to build the state 
> of replicas. So, if we keep the number of standby replicas as 1, we still 
> have more availability for persistent state stores as changelog Kafka topics 
> are also replicated depending upon broker replication policy but that also 
> means we are using at least 4 times the space(1 master store, 1 replica 
> store, 1 changelog, 1 changelog replica). 
> Now if we have an year's data in persistent stores(rocksdb), we don't want 
> the changelog topics to have an year's data as it will put an unnecessary 
> burden on brokers(in terms of space). If we have to scale our kafka streams 
> application(having 200-300 TB's of data) we have to scale the kafka brokers 
> as well. We want to reduce this dependency and find out ways to just use 
> changelog topic as a queue, having just 2 or 3 days of data and warm up the 
> replicas from scratch in some other way.
> I have few proposals in that respect.
> 1. Use a new kafka topic related to each partition whi



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has retention time

2018-03-12 Thread Navinder Brar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6643:
-
Description: 
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.

I have few proposals in that respect.
1. Use a new kafka topic related to each partition whi

  was:
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.


> Warm up new replicas from scratch when changelog topic has retention time
> -
>
> Key: KAFKA-6643
> URL: https://issues.apache.org/jira/browse/KAFKA-6643
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> In the current scenario, Kafka Streams has changelog Kafka topics(internal 
> topics having all the data for the store) which are used to build the state 
> of replicas. So, if we keep the number of standby replicas as 1, we still 
> have more availability for persistent state stores as changelog Kafka topics 
> are also replicated depending upon broker replication policy but that also 
> means we are using at least 4 times the space(1 master store, 1 replica 
> store, 1 changelog, 1 changelog replica). 
> Now if we have an year's data in persistent stores(rocksdb), we don't want 
> the changelog topics to have an year's data as it will put an unnecessary 
> burden on brokers(in terms of space). If we have to scale our kafka streams 
> application(having 200-300 TB's of data) we have to scale the kafka brokers 
> as well. We want to reduce this dependency and find out ways to just use 
> changelog topic as a queue, having just 2 or 3 days of data and warm up the 
> replicas from scratch in some other way.
> I have few proposals in that respect.
> 1. Use a new kafka topic related to each partition whi



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has retention time

2018-03-12 Thread Navinder Brar (JIRA)
Navinder Brar created KAFKA-6643:


 Summary: Warm up new replicas from scratch when changelog topic 
has retention time
 Key: KAFKA-6643
 URL: https://issues.apache.org/jira/browse/KAFKA-6643
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar


In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6624) log segment deletion could cause a disk to be marked offline incorrectly

2018-03-12 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6624.

   Resolution: Fixed
Fix Version/s: 1.1.0

> log segment deletion could cause a disk to be marked offline incorrectly
> 
>
> Key: KAFKA-6624
> URL: https://issues.apache.org/jira/browse/KAFKA-6624
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.1.0
>
>
> Saw the following log.
> [2018-03-06 23:12:20,721] ERROR Error while flushing log for topic1-0 in dir 
> /data01/kafka-logs with offset 80993 (kafka.server.LogDirFailureChannel)
> java.nio.channels.ClosedChannelException
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
>         at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:379)
>         at 
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:163)
>         at 
> kafka.log.LogSegment$$anonfun$flush$1.apply$mcV$sp(LogSegment.scala:375)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>         at kafka.log.LogSegment.flush(LogSegment.scala:374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1373)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.log.Log$$anonfun$flush$1.apply$mcV$sp(Log.scala:1373)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.flush(Log.scala:1368)
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$1.apply$mcV$sp(Log.scala:1343)
>         at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>         at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2018-03-06 23:12:20,722] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /data01/kafka-logs (kafka.server.ReplicaManager)
> It seems that topic1 was being deleted around the time when flushing was 
> called. Then flushing hit an IOException, which caused the disk to be marked 
> offline incorrectly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6624) log segment deletion could cause a disk to be marked offline incorrectly

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396554#comment-16396554
 ] 

ASF GitHub Bot commented on KAFKA-6624:
---

junrao closed pull request #4663: KAFKA-6624; Prevent concurrent log flush and 
log deletion
URL: https://github.com/apache/kafka/pull/4663
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 9ae93aadf06..7aa5bcd88d8 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -75,7 +75,8 @@ class LogManager(logDirs: Seq[File],
   // from one log directory to another log directory on the same broker. The 
directory of the future log will be renamed
   // to replace the current log of the partition after the future log catches 
up with the current log
   private val futureLogs = new Pool[TopicPartition, Log]()
-  private val logsToBeDeleted = new LinkedBlockingQueue[Log]()
+  // Each element in the queue contains the log object to be deleted and the 
time it is scheduled for deletion.
+  private val logsToBeDeleted = new LinkedBlockingQueue[(Log, Long)]()
 
   private val _liveLogDirs: ConcurrentLinkedQueue[File] = 
createAndValidateLogDirs(logDirs, initialOfflineDirs)
   @volatile var currentDefaultConfig = initialDefaultConfig
@@ -240,6 +241,10 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  private def addLogToBeDeleted(log: Log): Unit = {
+this.logsToBeDeleted.add((log, time.milliseconds()))
+  }
+
   private def loadLog(logDir: File, recoveryPoints: Map[TopicPartition, Long], 
logStartOffsets: Map[TopicPartition, Long]): Unit = {
 debug("Loading log '" + logDir.getName + "'")
 val topicPartition = Log.parseTopicPartitionName(logDir)
@@ -260,7 +265,7 @@ class LogManager(logDirs: Seq[File],
   logDirFailureChannel = logDirFailureChannel)
 
 if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
-  this.logsToBeDeleted.add(log)
+  addLogToBeDeleted(log)
 } else {
   val previous = {
 if (log.isFuture)
@@ -704,9 +709,12 @@ class LogManager(logDirs: Seq[File],
   private def deleteLogs(): Unit = {
 try {
   while (!logsToBeDeleted.isEmpty) {
-val removedLog = logsToBeDeleted.take()
+val (removedLog, scheduleTimeMs) = logsToBeDeleted.take()
 if (removedLog != null) {
   try {
+val waitingTimeMs = scheduleTimeMs + 
currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
+if (waitingTimeMs > 0)
+  Thread.sleep(waitingTimeMs)
 removedLog.delete()
 info(s"Deleted log for partition ${removedLog.topicPartition} in 
${removedLog.dir.getAbsolutePath}.")
   } catch {
@@ -767,7 +775,7 @@ class LogManager(logDirs: Seq[File],
 sourceLog.close()
 checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile)
 checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile)
-logsToBeDeleted.add(sourceLog)
+addLogToBeDeleted(sourceLog)
   } catch {
 case e: KafkaStorageException =>
   // If sourceLog's log directory is offline, we need close its 
handlers here.
@@ -805,7 +813,7 @@ class LogManager(logDirs: Seq[File],
   removedLog.renameDir(Log.logDeleteDirName(topicPartition))
   checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
   checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
-  logsToBeDeleted.add(removedLog)
+  addLogToBeDeleted(removedLog)
   info(s"Log for partition ${removedLog.topicPartition} is renamed to 
${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
 } else if (offlineLogDirs.nonEmpty) {
   throw new KafkaStorageException("Failed to delete log for " + 
topicPartition + " because it may be in one of the offline directories " + 
offlineLogDirs.mkString(","))


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> log segment deletion could cause a disk to be marked offline incorrectly
> 
>
> Key: KAFKA-6624
> URL: https://issues.apache.org/jira/browse/KAFKA-6624
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Assignee: Dong 

[jira] [Created] (KAFKA-6642) Rack aware replica assignment in kafka streams

2018-03-12 Thread Ashish Surana (JIRA)
Ashish Surana created KAFKA-6642:


 Summary: Rack aware replica assignment in kafka streams
 Key: KAFKA-6642
 URL: https://issues.apache.org/jira/browse/KAFKA-6642
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Ashish Surana


We have rack aware replica assignment in kafka broker ([KIP-36 Rack aware 
replica 
assignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]).

This request is to have a similar feature for kafka streams applications. 
Standby tasks/standby replica assignment in kafka streams is currently not rack 
aware, and this request is to make it rack aware for better availability.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-03-12 Thread Ashish Surana (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396531#comment-16396531
 ] 

Ashish Surana commented on KAFKA-6555:
--

+1. Let me capture all the approaches in KIP, and we can have discussion and 
feedback there.

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6638) Leader should remove unassigned replica from ISR

2018-03-12 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6638:

Description: During partition reassignment, controller may remove a replica 
from the replica set while still keeping the replica in the isr set. One 
solution to fix this issue is to let the controller remove replica from isr set 
as well. This requires more change in the code because controller needs to 
update the zookeeper node to change the isr properly. Another solution, which 
keeps controller simple, is to let leader node remove unassigned replica from 
the isr set.

> Leader should remove unassigned replica from ISR
> 
>
> Key: KAFKA-6638
> URL: https://issues.apache.org/jira/browse/KAFKA-6638
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> During partition reassignment, controller may remove a replica from the 
> replica set while still keeping the replica in the isr set. One solution to 
> fix this issue is to let the controller remove replica from isr set as well. 
> This requires more change in the code because controller needs to update the 
> zookeeper node to change the isr properly. Another solution, which keeps 
> controller simple, is to let leader node remove unassigned replica from the 
> isr set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6638) Leader should remove unassigned replica from ISR

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396325#comment-16396325
 ] 

ASF GitHub Bot commented on KAFKA-6638:
---

lindong28 opened a new pull request #4696: KAFKA-6638; Leader should remove 
unassigned replica from ISR
URL: https://github.com/apache/kafka/pull/4696
 
 
   During partition reassignment, controller may remove a replica from the 
replica set while still keeping the replica in the isr set. One solution to fix 
this issue is to let the controller remove replica from isr set as well. This 
requires more change in the code because controller needs to update the 
zookeeper node to change the isr properly. Another solution, which keeps 
controller simple, is to let leader node remove unassigned replica from the isr 
set.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Leader should remove unassigned replica from ISR
> 
>
> Key: KAFKA-6638
> URL: https://issues.apache.org/jira/browse/KAFKA-6638
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> During partition reassignment, controller may remove a replica from the 
> replica set while still keeping the replica in the isr set. One solution to 
> fix this issue is to let the controller remove replica from isr set as well. 
> This requires more change in the code because controller needs to update the 
> zookeeper node to change the isr properly. Another solution, which keeps 
> controller simple, is to let leader node remove unassigned replica from the 
> isr set.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6638) Leader should remove unassigned replica from ISR

2018-03-12 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6638:

Summary: Leader should remove unassigned replica from ISR  (was: Controller 
should remove replica from ISR if the replica is removed from the replica set)

> Leader should remove unassigned replica from ISR
> 
>
> Key: KAFKA-6638
> URL: https://issues.apache.org/jira/browse/KAFKA-6638
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6631) Kafka Streams - Rebalancing exception in Kafka 1.0.0

2018-03-12 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396307#comment-16396307
 ] 

Guozhang Wang commented on KAFKA-6631:
--

If you are certain that I do not see the error complaining about 
{{RecordTooLargeException}} any more on broker, that may then be resulted from 
a different issue now. 

On the client side, do you see exactly the same stack trace, i.e. 
{{org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: The 
server experienced an unexpected error when processing the request}} from 
{{org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle}}?

Another possibility, is that your record size is even bigger than the 
configured segment size (by default it is 1GB, so if you do not override it 
that should not happen.)

> Kafka Streams - Rebalancing exception in Kafka 1.0.0
> 
>
> Key: KAFKA-6631
> URL: https://issues.apache.org/jira/browse/KAFKA-6631
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Container Linux by CoreOS 1576.5.0
>Reporter: Alexander Ivanichev
>Priority: Critical
>
>  
> In Kafka Streams 1.0.0, we saw a strange rebalance error, our stream app 
> performs window based aggregations, sometimes on start when all stream 
> workers  join the app just crash, however if we enable only one worker than 
> it works fine, sometime 2 workers work just fine, but when third join the app 
> crashes again, some critical issue with rebalance.
> {code:java}
> 018-03-08T18:51:01.226243000Z org.apache.kafka.common.KafkaException: 
> Unexpected error from SyncGroup: The server experienced an unexpected error 
> when processing the request
> 2018-03-08T18:51:01.226557000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
> 2018-03-08T18:51:01.22686Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539)
> 2018-03-08T18:51:01.227328000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
> 2018-03-08T18:51:01.22763Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
> 2018-03-08T18:51:01.228152000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> 2018-03-08T18:51:01.228449000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> 2018-03-08T18:51:01.228897000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> 2018-03-08T18:51:01.229196000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
> 2018-03-08T18:51:01.229673000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
> 2018-03-08T18:51:01.229971000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
> 2018-03-08T18:51:01.230436000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
> 2018-03-08T18:51:01.230749000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
> 2018-03-08T18:51:01.231065000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
> 2018-03-08T18:51:01.231584000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> 2018-03-08T18:51:01.231911000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> 2018-03-08T18:51:01.23219Z at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
> 2018-03-08T18:51:01.232643000Z at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
> 2018-03-08T18:51:01.233121000Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> 2018-03-08T18:51:01.233409000Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> 2018-03-08T18:51:01.23372Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
> 2018-03-08T18:51:01.234196000Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> 

[jira] [Updated] (KAFKA-6641) Consider auto repartitioning for Stream.transform() API

2018-03-12 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6641:
-
Labels: api  (was: )

> Consider auto repartitioning for Stream.transform() API
> ---
>
> Key: KAFKA-6641
> URL: https://issues.apache.org/jira/browse/KAFKA-6641
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: api
>
> Today with map / mapValues of Streams DSL, we will set a flag / not set a 
> flag for the underlying topology builder; but for transform / 
> transformValues, we do not make such marking choices. Maybe the topology 
> builder can still make such flagging for transform() to indicate that since 
> the key maybe changed, we should issue a repartition for the downstream 
> stateful operators when necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6641) Consider auto repartitioning for Stream.transform() API

2018-03-12 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6641:


 Summary: Consider auto repartitioning for Stream.transform() API
 Key: KAFKA-6641
 URL: https://issues.apache.org/jira/browse/KAFKA-6641
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today with map / mapValues of Streams DSL, we will set a flag / not set a flag 
for the underlying topology builder; but for transform / transformValues, we do 
not make such marking choices. Maybe the topology builder can still make such 
flagging for transform() to indicate that since the key maybe changed, we 
should issue a repartition for the downstream stateful operators when necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6640) Improve efficiency of KafkaAdminClient.describeTopics()

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396241#comment-16396241
 ] 

ASF GitHub Bot commented on KAFKA-6640:
---

lindong28 opened a new pull request #4694: KAFKA-6640; Improve efficiency of 
KafkaAdminClient.describeTopics()
URL: https://github.com/apache/kafka/pull/4694
 
 
   Currently in KafkaAdminClient.describeTopics(), for each topic in the 
request, an complete map of cluster and errors will be constructed for every 
topic and partition. This unnecessarily increases the complexity of 
describeTopics() to O(n^2). This patch improves the complexity to O(n).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve efficiency of KafkaAdminClient.describeTopics()
> ---
>
> Key: KAFKA-6640
> URL: https://issues.apache.org/jira/browse/KAFKA-6640
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently in KafkaAdminClient.describeTopics(), for each topic in the 
> request, an complete map of cluster and errors will be constructed for every 
> topic and partition. This unnecessarily increases the complexity of 
> describeTopics() to O(n^2).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6640) Improve efficiency of KafkaAdminClient.describeTopics()

2018-03-12 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6640:

Description: Currently in KafkaAdminClient.describeTopics(), for each topic 
in the request, an complete map of cluster and errors will be constructed for 
every topic and partition. This unnecessarily increases the complexity of 
describeTopics() to O(n^2).  (was: Currently in 
KafkaAdminClient.describeTopics(), for each topic in the request, an complete 
map of cluster and errors will be constructed for every topic and partition. 
This unnecessarily increases the complexity of describeTopics() to O(n^2). We 
can improve its efficiency to O(n))

> Improve efficiency of KafkaAdminClient.describeTopics()
> ---
>
> Key: KAFKA-6640
> URL: https://issues.apache.org/jira/browse/KAFKA-6640
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently in KafkaAdminClient.describeTopics(), for each topic in the 
> request, an complete map of cluster and errors will be constructed for every 
> topic and partition. This unnecessarily increases the complexity of 
> describeTopics() to O(n^2).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6640) Improve efficiency of KafkaAdminClient.describeTopics()

2018-03-12 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6640:

Description: Currently in KafkaAdminClient.describeTopics(), for each topic 
in the request, an complete map of cluster and errors will be constructed for 
every topic and partition. This unnecessarily increases the complexity of 
describeTopics() to O(n^2). We can improve its efficiency to O(n)  (was: 
Currently in KafkaAdminClient.describeTopics(), for each topic in the request, 
an complete map of cluster and errors will be constructed for every topic and 
partition. This unnecessarily increase the complexity of describeTopics() to 
O(n^2).)

> Improve efficiency of KafkaAdminClient.describeTopics()
> ---
>
> Key: KAFKA-6640
> URL: https://issues.apache.org/jira/browse/KAFKA-6640
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently in KafkaAdminClient.describeTopics(), for each topic in the 
> request, an complete map of cluster and errors will be constructed for every 
> topic and partition. This unnecessarily increases the complexity of 
> describeTopics() to O(n^2). We can improve its efficiency to O(n)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6640) Improve efficiency of KafkaAdminClient.describeTopics()

2018-03-12 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6640:

Description: Currently in KafkaAdminClient.describeTopics(), for each topic 
in the request, an complete map of cluster and errors will be constructed for 
every topic and partition. This unnecessarily increase the complexity of 
describeTopics() to O(n^2).  (was: Currently in 
KafkaAdminClient.describeTopics(), for each topic in the request, an complete 
map of cluster and errors will be constructed for every topic and partition. 
This is unnecessary and increase the complexity of describeTopics() to O(n^2).)

> Improve efficiency of KafkaAdminClient.describeTopics()
> ---
>
> Key: KAFKA-6640
> URL: https://issues.apache.org/jira/browse/KAFKA-6640
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently in KafkaAdminClient.describeTopics(), for each topic in the 
> request, an complete map of cluster and errors will be constructed for every 
> topic and partition. This unnecessarily increase the complexity of 
> describeTopics() to O(n^2).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6640) Improve efficiency of KafkaAdminClient.describeTopics()

2018-03-12 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6640:

Description: Currently in KafkaAdminClient.describeTopics(), for each topic 
in the request, an complete map of cluster and errors will be constructed for 
every topic and partition. This is unnecessary and increase the complexity of 
describeTopics() to O(n^2).

> Improve efficiency of KafkaAdminClient.describeTopics()
> ---
>
> Key: KAFKA-6640
> URL: https://issues.apache.org/jira/browse/KAFKA-6640
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently in KafkaAdminClient.describeTopics(), for each topic in the 
> request, an complete map of cluster and errors will be constructed for every 
> topic and partition. This is unnecessary and increase the complexity of 
> describeTopics() to O(n^2).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6640) Improve efficiency of KafkaAdminClient.describeTopics()

2018-03-12 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-6640:
---

 Summary: Improve efficiency of KafkaAdminClient.describeTopics()
 Key: KAFKA-6640
 URL: https://issues.apache.org/jira/browse/KAFKA-6640
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-4831) Extract WindowedSerde to public APIs

2018-03-12 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-4831:


   Resolution: Fixed
 Assignee: Guozhang Wang  (was: Vitaly Pushkar)
Fix Version/s: 1.2.0

> Extract WindowedSerde to public APIs
> 
>
> Key: KAFKA-4831
> URL: https://issues.apache.org/jira/browse/KAFKA-4831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie, user-experience
> Fix For: 1.2.0
>
>
> Now that we have augmented WindowSerde with non-arg parameters, the next step 
> is to extract it out as part of the public APIs so that users who wants to 
> I/O windowed streams can use it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3368) Add the Message/Record set protocol to the protocol docs

2018-03-12 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-3368.

   Resolution: Fixed
Fix Version/s: 1.2.0

> Add the Message/Record set protocol to the protocol docs
> 
>
> Key: KAFKA-3368
> URL: https://issues.apache.org/jira/browse/KAFKA-3368
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Andras Beni
>Priority: Major
> Fix For: 1.2.0
>
>
> The message/Record format is not a part of the standard Protocol.java class. 
> This should be added to the protocol or manually added to the doc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5540) Deprecate and remove internal converter configs

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396134#comment-16396134
 ] 

ASF GitHub Bot commented on KAFKA-5540:
---

C0urante opened a new pull request #4693: KAFKA-5540: Deprecate internal 
converter configs
URL: https://github.com/apache/kafka/pull/4693
 
 
   Configuration properties 'internal.key.converter' and 
'internal.value.converter'
   are deprecated, and default to org.apache.kafka.connect.json.JsonConverter.
   
   Warnings are logged if values are specified for either, or if properties that
   appear to configure instances of internal converters (i.e., ones prefixed 
with
   either 'internal.key.converter.' or 'internal.value.converter.') are given.
   
   The property 'schemas.enable' is also defaulted to false for internal
   JsonConverter instances (both for keys and values) if it isn't specified.
   
   Documentation and code have also been updated with deprecation notices and
   annotations, respectively.
   
   Unit tests have been updated in `PluginsTest` to account for the new 
defaults for `schemas.enable` for internal key/value converters, and to ensure 
that (for the time being), internal key/value converters are still configurable 
despite being deprecated.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deprecate and remove internal converter configs
> ---
>
> Key: KAFKA-5540
> URL: https://issues.apache.org/jira/browse/KAFKA-5540
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip
>
> The internal.key.converter and internal.value.converter were original exposed 
> as configs because a) they are actually pluggable and b) providing a default 
> would require relying on the JsonConverter always being available, which 
> until we had classloader isolation it was possible might be removed for 
> compatibility reasons.
> However, this has ultimately just caused a lot more trouble and confusion 
> than it is worth. We should deprecate the configs, give them a default of 
> JsonConverter (which is also kind of nice since it results in human-readable 
> data in the internal topics), and then ultimately remove them in the next 
> major version.
> These are all public APIs so this will need a small KIP before we can make 
> the change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6639) Follower may have sparse index if catching up

2018-03-12 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6639:
--

 Summary: Follower may have sparse index if catching up
 Key: KAFKA-6639
 URL: https://issues.apache.org/jira/browse/KAFKA-6639
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Dhruvil Shah


When a follower is catching up, it may fetch considerably more data than the 
size of the offset index interval. Since we only write to the index once for 
every append, this could lead to a sparsely populated index, which may have a 
performance impact if the follower ever becomes leader.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE

2018-03-12 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6535:
-
Description: 
After KIP-220 / KIP-204, repartition topics in Streams are transient, so it is 
better to set its default retention to infinity to allow any records be pushed 
to it with old timestamps (think: bootstrapping, re-processing) and just rely 
on the purging API to keeping its storage small.

More specifically, in {{RepartitionTopicConfig}} we have a few default 
overrides for repartition topic configs, we should just add the override for 
{{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still 
allows users to override themselves if they want via 
{{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this update 
takes effect.

In addition to the code change, we also need to have doc changes in 
streams/upgrade_guide.html specifying this default value change.

  was:After KIP-220 / KIP-204, repartition topics in Streams are transient, so 
it is better to set its default retention to infinity to allow any records be 
pushed to it with old timestamps (think: bootstrapping, re-processing) and just 
rely on the purging API to keeping its storage small.


> Set default retention ms for Streams repartition topics to Long.MAX_VALUE
> -
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.
> More specifically, in {{RepartitionTopicConfig}} we have a few default 
> overrides for repartition topic configs, we should just add the override for 
> {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still 
> allows users to override themselves if they want via 
> {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this 
> update takes effect.
> In addition to the code change, we also need to have doc changes in 
> streams/upgrade_guide.html specifying this default value change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE

2018-03-12 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395884#comment-16395884
 ] 

Guozhang Wang commented on KAFKA-6535:
--

Makes sense, I've changed the title accordingly.

And also I've updated the description for anyone to pick them up more easily.

> Set default retention ms for Streams repartition topics to Long.MAX_VALUE
> -
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE

2018-03-12 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6535:
-
Summary: Set default retention ms for Streams repartition topics to 
Long.MAX_VALUE  (was: Set default retention ms for Streams repartition topics 
to infinity)

> Set default retention ms for Streams repartition topics to Long.MAX_VALUE
> -
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6638) Controller should remove replica from ISR if the replica is removed from the replica set

2018-03-12 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-6638:
---

 Summary: Controller should remove replica from ISR if the replica 
is removed from the replica set
 Key: KAFKA-6638
 URL: https://issues.apache.org/jira/browse/KAFKA-6638
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6636) ReplicaFetcherThread should not die if hw < 0

2018-03-12 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6636:

Description: 
ReplicaFetcherThread can die in the following scenario:

 

1) Partition P1 has replica set size 1. Broker A is the leader. The segment is 
empty and log start offset is 100

2) User executes partition reassignment to change replica set from \{A} to \{B, 
C}

3) Broker B starts ReplicaFetcherThread, which triggers 
handleOffsetOutOfRange(), truncates the log fully and start at offset 100. At 
this moment its high watermark is still 0 (or -1). Same for broker C.

4) Broker B sends FetchRequest to A at offset 100, broker A immediately adds 
broker B to ISR set, and controller moves leadership to broker B.

5) Broker B handles LeaderAndIsrRequest to become leader. It calls 
`leaderReplica.convertHWToLocalOffsetMetadata()` to initialize its HW. Since 
its HW was smaller than logStartOffset=100, now its HW will be overridden to 
LogOffsetMetadata.UnknownOffsetMetadata, i.e. -1.

6) Broker C handles LeaderAndIsrRequest to fetch from broker B. Broker C 
updates its HW to the FetchRequest's HW, i.e. -1. Then broker C calls 
replica.maybeIncrementLogStartOffset(leaderLogStartOffset) where 
leaderLogStartOffset=100. This cause exception because leaderLogStartOffset > 
HW. This is an unhandled exception and thus the ReplicaFetcherThread will exit

> ReplicaFetcherThread should not die if hw < 0
> -
>
> Key: KAFKA-6636
> URL: https://issues.apache.org/jira/browse/KAFKA-6636
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> ReplicaFetcherThread can die in the following scenario:
>  
> 1) Partition P1 has replica set size 1. Broker A is the leader. The segment 
> is empty and log start offset is 100
> 2) User executes partition reassignment to change replica set from \{A} to 
> \{B, C}
> 3) Broker B starts ReplicaFetcherThread, which triggers 
> handleOffsetOutOfRange(), truncates the log fully and start at offset 100. At 
> this moment its high watermark is still 0 (or -1). Same for broker C.
> 4) Broker B sends FetchRequest to A at offset 100, broker A immediately adds 
> broker B to ISR set, and controller moves leadership to broker B.
> 5) Broker B handles LeaderAndIsrRequest to become leader. It calls 
> `leaderReplica.convertHWToLocalOffsetMetadata()` to initialize its HW. Since 
> its HW was smaller than logStartOffset=100, now its HW will be overridden to 
> LogOffsetMetadata.UnknownOffsetMetadata, i.e. -1.
> 6) Broker C handles LeaderAndIsrRequest to fetch from broker B. Broker C 
> updates its HW to the FetchRequest's HW, i.e. -1. Then broker C calls 
> replica.maybeIncrementLogStartOffset(leaderLogStartOffset) where 
> leaderLogStartOffset=100. This cause exception because leaderLogStartOffset > 
> HW. This is an unhandled exception and thus the ReplicaFetcherThread will exit



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395807#comment-16395807
 ] 

ASF GitHub Bot commented on KAFKA-6560:
---

guozhangwang closed pull request #4685: KAFKA-6560: Add docs for KIP-261
URL: https://github.com/apache/kafka/pull/4685
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 1d5a342b2bb..f5c16c08cd2 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -34,9 +34,17 @@ Upgrade Guide and API Changes
 
 
 
-If you are using Java 7 and want to upgrade from 1.0.x to 1.1.0 you 
don't need to make any code changes as the public API is fully backward 
compatible.
-If you are using Java 8 method references in your Kafka Streams code 
you might need to update your code to resolve method ambiguties.
-   Hot-swaping the jar-file only might not work for this case.
+If you want to upgrade from 1.1.x to 1.2.0 and you have customized 
window store implementations on the ReadOnlyWindowStore interface
+you'd need to update your code to incorporate the newly added public 
APIs; otherwise you don't need to make any code changes.
+See below for a complete list 
of 1.2.0 API and semantic changes that allow you to advance your application 
and/or simplify your code base.
+
+
+
+If you want to upgrade from 1.0.x to 1.1.0 and you have customized 
window store implementations on the ReadOnlyWindowStore interface
+you'd need to update your code to incorporate the newly added public 
APIs.
+Otherwise, if you are using Java 7 you don't need to make any code 
changes as the public API is fully backward compatible;
+but if you are using Java 8 method references in your Kafka Streams 
code you might need to update your code to resolve method ambiguities.
+Hot-swaping the jar-file only might not work for this case.
 See below for a complete list 
of 1.1.0 API and semantic changes that allow you to advance your application 
and/or simplify your code base.
 
 
@@ -64,12 +72,21 @@ Upgrade Guide and API Changes
 See below a complete list of 
0.10.1 API changes that allow you to advance your application and/or simplify 
your code base, including the usage of new features.
 
 
+
+Streams API changes in 1.2.0
+
+We have added support for methods in ReadOnlyWindowStore 
which allows for querying a single window's key-value pair.
+For users who have customized window store implementations on the 
above interface, they'd need to update their code to implement the newly added 
method as well.
+For more details, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-261%3A+Add+Single+Value+Fetch+in+Window+Stores;>KIP-261.
+
+
 Streams API changes in 1.1.0
 
-   We have added support for methods in ReadOnlyWindowStore 
which allows for querying WindowStores without the neccesity of 
providing keys.
+We have added support for methods in ReadOnlyWindowStore 
which allows for querying WindowStores without the necessity of 
providing keys.
+For users who have customized window store implementations on the 
above interface, they'd need to update their code to implement the newly added 
method as well.
+For more details, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-205%3A+Add+all%28%29+and+range%28%29+API+to+ReadOnlyWindowStore;>KIP-205.
 
 
-
 
There is a new artifact kafka-streams-test-utils providing 
a TopologyTestDriver, ConsumerRecordFactory, and 
OutputVerifier class.
You can include the new artifact as a regular dependency to your unit 
tests and use the test driver to test your business logic of your Kafka Streams 
application.


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use single-point queries than range queries for windowed aggregation operators
> --
>
> Key: KAFKA-6560
> URL: https://issues.apache.org/jira/browse/KAFKA-6560
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: needs-kip
>  

[jira] [Updated] (KAFKA-6637) if set topic config segment.ms=0 Kafka broker won't be able to start

2018-03-12 Thread Chong Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chong Wang updated KAFKA-6637:
--
Affects Version/s: 1.0.0

> if set topic config segment.ms=0 Kafka broker won't be able to start
> 
>
> Key: KAFKA-6637
> URL: https://issues.apache.org/jira/browse/KAFKA-6637
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Chong Wang
>Priority: Major
>
> If set topic config segment.ms to 0, Kafka server won't be able to start 
> because of a FATAL error:
> [2018-03-12 19:05:40,196] FATAL [KafkaServer id=2] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) 
> java.lang.ArithmeticException: / by zero at 
> kafka.log.LogConfig.randomSegmentJitter(LogConfig.scala:100) at 
> kafka.log.Log.loadSegments(Log.scala:419) at 
> kafka.log.Log.(Log.scala:203) at kafka.log.Log$.apply(Log.scala:1734) 
> at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:221) 
> at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$8$$anonfun$apply$16$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:292)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> [https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/log/LogConfig.scala#L100]
> So the minimum value shouldn't be 0
> https://kafka.apache.org/documentation/#topicconfigs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6637) if set topic config segment.ms=0 Kafka broker won't be able to start

2018-03-12 Thread Chong Wang (JIRA)
Chong Wang created KAFKA-6637:
-

 Summary: if set topic config segment.ms=0 Kafka broker won't be 
able to start
 Key: KAFKA-6637
 URL: https://issues.apache.org/jira/browse/KAFKA-6637
 Project: Kafka
  Issue Type: Bug
Reporter: Chong Wang


If set topic config segment.ms to 0, Kafka server won't be able to start 
because of a FATAL error:

[2018-03-12 19:05:40,196] FATAL [KafkaServer id=2] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) 
java.lang.ArithmeticException: / by zero at 
kafka.log.LogConfig.randomSegmentJitter(LogConfig.scala:100) at 
kafka.log.Log.loadSegments(Log.scala:419) at 
kafka.log.Log.(Log.scala:203) at kafka.log.Log$.apply(Log.scala:1734) at 
kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:221) at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$8$$anonfun$apply$16$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:292)
 at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)

[https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/log/LogConfig.scala#L100]

So the minimum value shouldn't be 0
https://kafka.apache.org/documentation/#topicconfigs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6636) ReplicaFetcherThread should not die if hw < 0

2018-03-12 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-6636:
---

 Summary: ReplicaFetcherThread should not die if hw < 0
 Key: KAFKA-6636
 URL: https://issues.apache.org/jira/browse/KAFKA-6636
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6624) log segment deletion could cause a disk to be marked offline incorrectly

2018-03-12 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned KAFKA-6624:
---

Assignee: Dong Lin

> log segment deletion could cause a disk to be marked offline incorrectly
> 
>
> Key: KAFKA-6624
> URL: https://issues.apache.org/jira/browse/KAFKA-6624
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Assignee: Dong Lin
>Priority: Major
>
> Saw the following log.
> [2018-03-06 23:12:20,721] ERROR Error while flushing log for topic1-0 in dir 
> /data01/kafka-logs with offset 80993 (kafka.server.LogDirFailureChannel)
> java.nio.channels.ClosedChannelException
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
>         at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:379)
>         at 
> org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:163)
>         at 
> kafka.log.LogSegment$$anonfun$flush$1.apply$mcV$sp(LogSegment.scala:375)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>         at kafka.log.LogSegment.flush(LogSegment.scala:374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1374)
>         at 
> kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1373)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at kafka.log.Log$$anonfun$flush$1.apply$mcV$sp(Log.scala:1373)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.flush(Log.scala:1368)
>         at 
> kafka.log.Log$$anonfun$roll$2$$anonfun$apply$1.apply$mcV$sp(Log.scala:1343)
>         at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>         at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2018-03-06 23:12:20,722] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /data01/kafka-logs (kafka.server.ReplicaManager)
> It seems that topic1 was being deleted around the time when flushing was 
> called. Then flushing hit an IOException, which caused the disk to be marked 
> offline incorrectly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4984) Unable to produce or consume when enabling authentication SASL/Kerberos

2018-03-12 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4984.
--
Resolution: Cannot Reproduce

{color:#00}Closing inactive issue. Please reopen if you think the issue 
still exists{color}
 

> Unable to produce or consume when enabling authentication SASL/Kerberos
> ---
>
> Key: KAFKA-4984
> URL: https://issues.apache.org/jira/browse/KAFKA-4984
> Project: Kafka
>  Issue Type: Bug
> Environment: Ubuntu 16.04LTS running in VirtualBox
>Reporter: Ait haj Slimane
>Priority: Critical
> Attachments: Screenshot from 2017-03-30 15-36-30.png, logKafka.txt, 
> logZookeeper.txt
>
>
> I have a problem while trying to produce or consume on kerberos enabled 
> cluster.
> I launched a single broker and a console producer,
> using the SASL authentication between producer and broker.
> When i run the producer ,I got the result attached below
> Any advice on what can cause the problem.
> Thanks!
> 
> configuration used:
> server.properties:
> listeners=SASL_PLAINTEXT://kafka.example.com:9092
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=GSSAPI
> sasl.enabled.mechanism=GSSAPI
> sasl.kerberos.service.name=kafka
> producer.properties
> bootstrap.servers=kafka.example.com:9092
> sasl.kerberos.service.name=kafka
> security.protocol=SASL_PLAINTEXT
> kafka_client_jaas.conf
> KafkaClient {
> com.sun.security.auth.module.Krb5LoginModule required
> useKeyTab=true
> storeKey=true
> keyTab="/etc/kafka/keytabs/kafkaclient.keytab"
> principal="kafkaclient/kafka.example@example.com";
> };



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5504) Kafka controller is not getting elected

2018-03-12 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5504.
--
Resolution: Not A Problem

Closing as per above comments.

> Kafka controller is not getting elected
> ---
>
> Key: KAFKA-5504
> URL: https://issues.apache.org/jira/browse/KAFKA-5504
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.1
>Reporter: Ashish Kumar
>Priority: Major
>
> I am having a kafka cluster of 20 nodes and I was facing the issue of 
> under-replicated topics issue for last few days so decided to restart the 
> broker which was working as a controller but after restart getting below logs 
> in all the brokers (It seems controller is not finalized and leader election 
> is happening continuously):
> [2017-06-23 02:59:50,388] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:50,396] INFO New leader is 12 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2017-06-23 02:59:50,410] INFO Rolled new log segment for 
> 'dpe_feedback_rating_history-4' in 0 ms. (kafka.log.Log)
> [2017-06-23 02:59:51,585] INFO Creating /controller (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:51,590] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:51,609] INFO New leader is 11 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2017-06-23 02:59:52,792] INFO Creating /controller (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:52,799] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:52,808] INFO New leader is 12 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2017-06-23 02:59:54,122] INFO New leader is 3 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2017-06-23 02:59:55,504] INFO Creating /controller (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:55,512] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:55,520] INFO New leader is 11 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2017-06-23 02:59:56,695] INFO Creating /controller (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:56,701] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:56,709] INFO New leader is 11 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2017-06-23 02:59:57,949] INFO Creating /controller (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:57,955] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:57,965] INFO New leader is 12 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2017-06-23 02:59:59,378] INFO Creating /controller (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:59,384] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2017-06-23 02:59:59,395] INFO New leader is 12 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> .
> .
> .
> Tried deleting controller znode (/controller) but no luck. Please let me know 
> if any fix is possible here.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6022) mirror maker stores offset in zookeeper

2018-03-12 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6022.
--
Resolution: Not A Problem

Please reopen if you think the issue still exists

> mirror maker stores offset in zookeeper
> ---
>
> Key: KAFKA-6022
> URL: https://issues.apache.org/jira/browse/KAFKA-6022
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ronald van de Kuil
>Priority: Minor
>
> I happened to notice that the mirror maker stores its offset in zookeeper. 
> I do not see it using:
> bin/kafka-consumer-groups.sh --bootstrap-server pi1:9092 --new-consumer --list
> I do however see consumers that store their offset in kafka.
> I would guess that storing the offset in zookeeper is old style?
> Would it be an idea to update the mirror maker to the new consumer style?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5467) setting offset retention minutes to a lower value is not reflecting

2018-03-12 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5467.
--
Resolution: Cannot Reproduce

Closing inactive issue. Please reopen if you think the issue exists.

> setting offset retention minutes to a lower value is not reflecting
> ---
>
> Key: KAFKA-5467
> URL: https://issues.apache.org/jira/browse/KAFKA-5467
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 0.10.1.1
>Reporter: Divya
>Priority: Major
>
> We have been observing offsets to be unknown and saw that our offset 
> retention time was lesser than the log retention period. Inorder to recreate 
> the same in test environment, we set the offset.retention.minutes to 1 minute 
> and the log retention time to 168 hours. There were no events written for 
> more than an hour but still the offsets were not turning to unknown. (The 
> offset clean interval was 10 minutes.) I would like to know the reason on why 
> the offset did not turn to unknown in an hour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6195) DNS alias support for secured connections

2018-03-12 Thread Jonathan Skrzypek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395521#comment-16395521
 ] 

Jonathan Skrzypek commented on KAFKA-6195:
--

updated the KIP, thoughts ?

> DNS alias support for secured connections
> -
>
> Key: KAFKA-6195
> URL: https://issues.apache.org/jira/browse/KAFKA-6195
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Jonathan Skrzypek
>Priority: Major
>
> It seems clients can't use a dns alias in front of a secured Kafka cluster.
> So applications can only specify a list of hosts or IPs in bootstrap.servers 
> instead of an alias encompassing all cluster nodes.
> Using an alias in bootstrap.servers results in the following error : 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Fail to create credential. (63) - No service creds)]) 
> occurred when evaluating SASL token received from the Kafka Broker. Kafka 
> Client will go to AUTH_FAILED state. [Caused by 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Fail to create 
> credential. (63) - No service creds)]]
> When using SASL/Kerberos authentication, the kafka server principal is of the 
> form kafka@kafka/broker1.hostname@example.com
> Kerberos requires that the hosts can be resolved by their FQDNs.
> During SASL handshake, the client will create a SASL token and then send it 
> to kafka for auth.
> But to create a SASL token the client first needs to be able to validate that 
> the broker's kerberos is a valid one.
> There are 3 potential options :
> 1. Creating a single kerberos principal not linked to a host but to an alias 
> and reference it in the broker jaas file.
> But I think the kerberos infrastructure would refuse to validate it, so the 
> SASL handshake would still fail
> 2. Modify the client bootstrap mechanism to detect whether bootstrap.servers 
> contains a dns alias. If it does, resolve and expand the alias to retrieve 
> all hostnames behind it and add them to the list of nodes.
> This could be done by modifying parseAndValidateAddresses() in ClientUtils
> 3. Add a cluster.alias parameter that would be handled by the logic above. 
> Having another parameter to avoid confusion on how bootstrap.servers works 
> behind the scene.
> Thoughts ?
> I would be happy to contribute the change for any of the options.
> I believe the ability to use a dns alias instead of static lists of brokers 
> would bring good deployment flexibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6337) Error for partition [__consumer_offsets,15] to broker

2018-03-12 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6337.
--
Resolution: Cannot Reproduce

Please reopen if you think the issue still exists

> Error for partition [__consumer_offsets,15] to broker
> -
>
> Key: KAFKA-6337
> URL: https://issues.apache.org/jira/browse/KAFKA-6337
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
> Environment: Windows running Kafka(0.10.2.0)
> 3 ZK Instances running on 3 different Windows Servers, 7 Kafka Broker nodes 
> running on single windows machine with different disk for logs directory.
>Reporter: Abhi
>Priority: Blocker
>  Labels: windows
>
> Hello *
> I am running Kafka(0.10.2.0) on windows from the past one year ...
> But off late there has been unique Broker issues that I have observed 4-5 
> times in
> last 4 months.
> Kafka setup cofig...
> 3 ZK Instances running on 3 different Windows Servers, 7 Kafka Broker nodes 
> running on single windows machine with different disk for logs directory
> My Kafka has 2 Topics with partition size 50 each , and replication factor of 
> 3.
> My partition logic selection: Each message has a unique ID and logic of 
> selecting partition is ( unique ID % 50), and then calling Kafka producer API 
> to route a specific message to a particular topic partition .
> My Each Broker Properties look like this
> {{broker.id=0
> port:9093
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> offsets.retention.minutes=360
> advertised.host.name=1.1.1.2
> advertised.port:9093
> ctories under which to store log files
> log.dirs=C:\\kafka_2.10-0.10.2.0-SNAPSHOT\\data\\kafka-logs
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> log.retention.minutes=360
> log.segment.bytes=52428800
> log.retention.check.interval.ms=30
> log.cleaner.enable=true
> log.cleanup.policy=delete
> log.cleaner.min.cleanable.ratio=0.5
> log.cleaner.backoff.ms=15000
> log.segment.delete.delay.ms=6000
> auto.create.topics.enable=false
> zookeeper.connect=1.1.1.2:2181,1.1.1.3:2182,1.1.1.4:2183
> zookeeper.connection.timeout.ms=6000
> }}
> But of-late there has been a unique case that's cropping out in Kafka broker 
> nodes,
> _[2017-12-02 02:47:40,024] ERROR [ReplicaFetcherThread-0-4], Error for 
> partition [__consumer_offsets,15] to broker 
> 4:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
> is not the leader for that topic-partition. 
> (kafka.server.ReplicaFetcherThread)_
> The entire server.log is filled with these logs, and its very huge too , 
> please help me in understanding under what circumstances these can occur, and 
> what measures I need to take.. 
> Please help me this is the third time in last three Saturdays i faced the 
> similar issue. 
> Courtesy
> Abhi
> !wq 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6556) allow to attach callbacks in kafka streams, to be triggered when a window is expired

2018-03-12 Thread Abhishek Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395095#comment-16395095
 ] 

Abhishek Agarwal commented on KAFKA-6556:
-

I am working on something similar (currently for tumbling windows). A construct 
to receive only the final updates for a window is quite useful and powerful. 

> allow to attach callbacks in kafka streams, to be triggered when a window is 
> expired 
> -
>
> Key: KAFKA-6556
> URL: https://issues.apache.org/jira/browse/KAFKA-6556
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: igor mazor
>Priority: Major
>
> Allowing to attach callbacks in kafka streams, to be triggered when a window 
> is expired,
>  would help in use cases when the final state of the window is required.
>  It would be also useful if together with that functionally the user would be 
> able to control whether the callback would be triggered in addition to 
> emitting the normal change log down the stream, or only triggering the 
> callback when the window is expired. (for example in cases when only the 
> final window state is required, and any updates to the window state during 
> the window time interval are not important)  
> An example for use case could be left join with proper sql semantics:
>  A left join B currently would emit (a1,null) and (a1,b1) if b1 arrives 
> within the defined join time window.
>  what I would like is to have ONLY ONE result:
>  (a1,null) if no b1 arrived during the defined join time window, OR ONLY 
> (a1,b1) if b1 did arrived in the specified join time window.
>  One possible solution could be to use the current kafka streams left join 
> with downstream processor which would put the results in a windowed Ktable.
>  The window size would be same as for the left join operation, however, only 
> the final state of that window would be emitted down the stream once the time 
> window is expired.
>  So if the left join produces (a1, null) and after X minutes no (a1, b1) was 
> produced, eventually only (a1, null) would be emitted, on the other hand, if 
> the left join produces (a1, null) and after X-t minutes (a1, b1) was produced 
> by the left join operation => only (a1, b1) would be emitted eventually down 
> the stream after X minutes.
>  
> Another use case is when the window state is written to another kafka topic 
> which is then used to persist the window states into a db, However, many 
> times only the final window state
>  is required, and functionality to get only the last window state would help 
> in reducing load from the db, since only the final window state would be 
> persisted to the db, instead of multiple db writes for each window state 
> update. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-03-12 Thread Filipe Agapito (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395070#comment-16395070
 ] 

Filipe Agapito commented on KAFKA-6474:
---

Hi John,
So far I've done most of the refactor and I haven't had any complaints from 
gradle (just did a clean compile). Now I'm refactoring the last class, 
KStreamTestDriver. Probably the circular dependency issue will show up now. 
It's nice to know it is a known issue. Thanks for the heads up!

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
>  Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4493) Connections to Kafka brokers should be validated

2018-03-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16395002#comment-16395002
 ] 

ASF GitHub Bot commented on KAFKA-4493:
---

taku-k closed pull request #2408: KAFKA-4493: Validate a plaintext client 
connection to a SSL broker
URL: https://github.com/apache/kafka/pull/2408
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/InvalidTransportLayerException.java
 
b/clients/src/main/java/org/apache/kafka/common/network/InvalidTransportLayerException.java
new file mode 100644
index 000..b1b69af09d6
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/InvalidTransportLayerException.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.KafkaException;
+
+public class InvalidTransportLayerException extends KafkaException {
+
+public InvalidTransportLayerException(String message) {
+super(message);
+}
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 409775cd978..c7a35d1f552 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -12,11 +12,14 @@
  */
 package org.apache.kafka.common.network;
 
+import org.apache.kafka.common.utils.Utils;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.ScatteringByteChannel;
+import java.util.regex.Pattern;
 
 /**
  * A size delimited Receive that consists of a 4 byte network-ordered size N 
followed by N bytes of content
@@ -31,6 +34,13 @@
 private final int maxSize;
 private ByteBuffer buffer;
 
+private boolean ensureThrough = false;
+private ByteBuffer tempOverBuf;
+/**
+ * Supporting TLSv1, TLSv1.1 and TLSv1.2
+ */
+private final static Pattern SSL_HANDSHAKE_ALERT = 
Pattern.compile("15030[123]00");
+
 
 public NetworkReceive(String source, ByteBuffer buffer) {
 this.source = source;
@@ -82,6 +92,9 @@ public long readFromReadableChannel(ReadableByteChannel 
channel) throws IOExcept
 if (bytesRead < 0)
 throw new EOFException();
 read += bytesRead;
+
+ensureNotHandshakeFailurePacket(channel, size);
+
 if (!size.hasRemaining()) {
 size.rewind();
 int receiveSize = size.getInt();
@@ -93,6 +106,15 @@ public long readFromReadableChannel(ReadableByteChannel 
channel) throws IOExcept
 this.buffer = ByteBuffer.allocate(receiveSize);
 }
 }
+
+// Packet head is the same as a SSL handshake bytes but actually the 
packet is correct.
+// Over read data must be inserted into the buffer.
+if (tempOverBuf != null) {
+buffer.put(tempOverBuf);
+read += 1024;
+tempOverBuf = null;
+}
+
 if (buffer != null) {
 int bytesRead = channel.read(buffer);
 if (bytesRead < 0)
@@ -107,4 +129,23 @@ public ByteBuffer payload() {
 return this.buffer;
 }
 
+// We determine if a peer connection uses SSL/TLS by seeing from the first 
few bytes.
+private void ensureNotHandshakeFailurePacket(ReadableByteChannel channel, 
ByteBuffer size) throws IOException {
+if (ensureThrough)
+return;
+ensureThrough = true;
+String head = Utils.hexToString(size.array());
+if (SSL_HANDSHAKE_ALERT.matcher(head).find()) {
+// Actually, SSL record size is 2 bytes, but head byte is already 
read.
+ByteBuffer recordSizeBuf = ByteBuffer.allocate(1);
+  

[jira] [Commented] (KAFKA-6317) Maven artifact for kafka should not depend on log4j

2018-03-12 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394958#comment-16394958
 ] 

Gérald Quintana commented on KAFKA-6317:


Does that mean that we can replace Log4J1 by Log4J2 ou Logback in the tarball?

https://issues.apache.org/jira/browse/KAFKA-1368

> Maven artifact for kafka should not depend on log4j
> ---
>
> Key: KAFKA-6317
> URL: https://issues.apache.org/jira/browse/KAFKA-6317
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 1.1.0
>
>
> It should only depend on slf4j-api. The release tarball should still depend 
> on log4j and slf4j-log4j12.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1368) Upgrade log4j

2018-03-12 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-1368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16394956#comment-16394956
 ] 

Gérald Quintana commented on KAFKA-1368:


How is this related to https://issues.apache.org/jira/browse/KAFKA-6317 ?

> Upgrade log4j
> -
>
> Key: KAFKA-1368
> URL: https://issues.apache.org/jira/browse/KAFKA-1368
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.0
>Reporter: Vladislav Pernin
>Assignee: Mickael Maison
>Priority: Major
>
> Upgrade log4j to at least 1.2.16 ou 1.2.17.
> Usage of EnhancedPatternLayout will be possible.
> It allows to set delimiters around the full log, stacktrace included, making 
> log messages collection easier with tools like Logstash.
> Example : <[%d{}]...[%t] %m%throwable>%n
> <[2014-04-08 11:07:20,360] ERROR [KafkaApi-1] Error when processing fetch 
> request for partition [X,6] offset 700 from consumer with correlation id 
> 0 (kafka.server.KafkaApis)
> kafka.common.OffsetOutOfRangeException: Request for offset 700 but we only 
> have log segments in the range 16021 to 16021.
> at kafka.log.Log.read(Log.scala:429)
> ...
> at java.lang.Thread.run(Thread.java:744)>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)