[jira] [Commented] (KAFKA-3978) Cannot truncate to a negative offset (-1) exception at broker startup

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398124#comment-16398124
 ] 

ASF GitHub Bot commented on KAFKA-3978:
---

hachikuji closed pull request #4695: KAFKA-3978; highwatermark should always be 
positive
URL: https://github.com/apache/kafka/pull/4695
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 3b97671524d..68faf00c079 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -460,7 +460,11 @@ class Partition(val topic: String,
 }.map(_.logEndOffset)
 val newHighWatermark = allLogEndOffsets.min(new 
LogOffsetMetadata.OffsetOrdering)
 val oldHighWatermark = leaderReplica.highWatermark
-if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || 
oldHighWatermark.onOlderSegment(newHighWatermark)) {
+
+// Ensure that the high watermark increases monotonically. We also update 
the high watermark when the new
+// offset metadata is on a newer segment, which occurs whenever the log is 
rolled to a new segment.
+if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
+  (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && 
oldHighWatermark.onOlderSegment(newHighWatermark))) {
   leaderReplica.highWatermark = newHighWatermark
   debug(s"High watermark updated to $newHighWatermark")
   true
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index e41e389e22d..030e5b7eb58 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -138,6 +138,9 @@ class Replica(val brokerId: Int,
 
   def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
 if (isLocal) {
+  if (newHighWatermark.messageOffset < 0)
+throw new IllegalArgumentException("High watermark offset should be 
non-negative")
+
   highWatermarkMetadata = newHighWatermark
   log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset))
   trace(s"Setting high watermark for replica $brokerId partition 
$topicPartition to [$newHighWatermark]")
@@ -165,9 +168,16 @@ class Replica(val brokerId: Int,
   s"non-local replica $brokerId"))
   }
 
-  def convertHWToLocalOffsetMetadata() = {
+  /*
+   * Convert hw to local offset metadata by reading the log at the hw offset.
+   * If the hw offset is out of range, return the first offset of the first 
log segment as the offset metadata.
+   */
+  def convertHWToLocalOffsetMetadata() {
 if (isLocal) {
-  highWatermarkMetadata = 
log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
+  highWatermarkMetadata = 
log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset).getOrElse {
+val firstOffset = log.get.logSegments.head.baseOffset
+new LogOffsetMetadata(firstOffset, firstOffset, 0)
+  }
 } else {
   throw new KafkaException(s"Should not construct complete high watermark 
on partition $topicPartition's non-local replica $brokerId")
 }
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 257dd8f9ba4..f0050f54aef 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1126,14 +1126,14 @@ class Log(@volatile var dir: File,
 
   /**
* Given a message offset, find its corresponding offset metadata in the log.
-   * If the message offset is out of range, return unknown offset metadata
+   * If the message offset is out of range, return None to the caller.
*/
-  def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {
+  def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {
 try {
   val fetchDataInfo = readUncommitted(offset, 1)
-  fetchDataInfo.fetchOffsetMetadata
+  Some(fetchDataInfo.fetchOffsetMetadata)
 } catch {
-  case _: OffsetOutOfRangeException => 
LogOffsetMetadata.UnknownOffsetMetadata
+  case _: OffsetOutOfRangeException => None
 }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 2a24a37f151..0c41519d211 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -76,6 +76,32 @@ class ReassignPartitionsClusterTest extends 
ZooKeeperTestHarness with Logging {
 

[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean

2018-03-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398119#comment-16398119
 ] 

Matthias J. Sax commented on KAFKA-6647:


[~gbloggs] Thanks for reporting the issue. What I am wondering is, why there is 
a .lock file in the task directory in the first place. On a clean shutdown, all 
lock files should be releases.

Thus, an existing .lock file indicates, that some thread is actually using the 
task directory and thus, it should not be deleted (ie, it's expected that 
cleanup() fails for this case).

Also note, a thread can delete a directory even if the directory contains it's 
own lock file (because the threads owns this lock).

Does this make sense?

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean
> 
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Priority: Minor
>  Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6646) Add a GlobalKStream object type for stream event broadcast

2018-03-13 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6646:
---
Labels: needs-kip  (was: )

> Add a GlobalKStream object type for stream event broadcast
> --
>
> Key: KAFKA-6646
> URL: https://issues.apache.org/jira/browse/KAFKA-6646
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Priority: Major
>  Labels: needs-kip
>
> There are some use cases where having a global KStream object is useful. For 
> example, where a single event is sent, with very low frequency, to a cluster 
> of Kafka stream nodes to trigger all nodes to do some processing of state 
> stored on their instance.
> Workaround currently is to either create a second kstream app instance, being 
> careful to configure it with a different state dir, and give it a unique app 
> name per instance, then create a kstream in each one. Or - you can use the 
> normal consumer client inside your kstream app with unique consumer groups.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6645) Host Affinity to facilitate faster restarts of kafka streams applications

2018-03-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398118#comment-16398118
 ] 

Matthias J. Sax commented on KAFKA-6645:


This should be supported already. On startup, Kafka Streams inspects it's local 
state directory and adds available stores into rebalance metadata (ie, 
prev-assigned standby tasks). This allows to reassign partitions accordingly to 
avoid state migration.

Note, that upcoming 1.1 release contains some improvements to partition 
assignment. This Jira might still be valid in order to improve the existing 
strategy further. It would be great if you could try out 1.0 or better 1.1 
release and check if Kafka Streams behaves as expected. If not, it would be 
great to learn what is missing in detail so we can close those gaps.

Thanks a lot!

> Host Affinity to facilitate faster restarts of kafka streams applications
> -
>
> Key: KAFKA-6645
> URL: https://issues.apache.org/jira/browse/KAFKA-6645
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Giridhar Addepalli
>Priority: Major
>
> Since Kafka Streams applications have lot of state in the stores in general, 
> it would be good to remember the assignment of partitions to machines. So 
> that when whole application is restarted for some reason, there is a way to 
> use past assignment of partitions to machines and there won't be need to 
> build up whole state by reading off of changelog kafka topic. This would 
> result in faster start-up.
> Samza has support for Host Affinity 
> ([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html])
> KIP-54 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)]
>  , handles cases where some members of consumer group goes down / comes up, 
> and KIP-54 ensures there is minimal diff between assignments before and after 
> rebalance. 
> But to handle whole restart use case, we need to remember past assignment 
> somewhere, and use it after restart.
> Please let us know if this is already solved problem / some cleaner way of 
> achieving this objective



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6645) Host Affinity to facilitate faster restarts of kafka streams applications

2018-03-13 Thread Giridhar Addepalli (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Giridhar Addepalli updated KAFKA-6645:
--
Description: 
Since Kafka Streams applications have lot of state in the stores in general, it 
would be good to remember the assignment of partitions to machines. So that 
when whole application is restarted for some reason, there is a way to use past 
assignment of partitions to machines and there won't be need to build up whole 
state by reading off of changelog kafka topic. This would result in faster 
start-up.

Samza has support for Host Affinity 
([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html])

KIP-54 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)]
 , handles cases where some members of consumer group goes down / comes up, and 
KIP-54 ensures there is minimal diff between assignments before and after 
rebalance. 

But to handle whole restart use case, we need to remember past assignment 
somewhere, and use it after restart.

Please let us know if this is already solved problem / some cleaner way of 
achieving this objective

  was:
Since Kafka Streams applications have lot of state in the stores in general, it 
would be good to remember the assignment of partitions to machines. So that 
when whole application is restarted for whatever reason, there is a way to use 
past assignment of partitions to machines and there won't be need to build up 
state by reading off of changelog kafka topic and would result in faster 
start-up.

Samza has support for Host Affinity 
([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html])

KIP-54 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)]
 , handles cases where some members of consumer group goes down / comes up, and 
KIP-54 ensures there is minimal diff between assignments before and after 
rebalance. 

But to handle whole restart use case, we need to remember past assignment 
somewhere, and use it after restart.

Please let us know if this is already solved problem / some cleaner way of 
achieving this objective


> Host Affinity to facilitate faster restarts of kafka streams applications
> -
>
> Key: KAFKA-6645
> URL: https://issues.apache.org/jira/browse/KAFKA-6645
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Giridhar Addepalli
>Priority: Major
>
> Since Kafka Streams applications have lot of state in the stores in general, 
> it would be good to remember the assignment of partitions to machines. So 
> that when whole application is restarted for some reason, there is a way to 
> use past assignment of partitions to machines and there won't be need to 
> build up whole state by reading off of changelog kafka topic. This would 
> result in faster start-up.
> Samza has support for Host Affinity 
> ([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html])
> KIP-54 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)]
>  , handles cases where some members of consumer group goes down / comes up, 
> and KIP-54 ensures there is minimal diff between assignments before and after 
> rebalance. 
> But to handle whole restart use case, we need to remember past assignment 
> somewhere, and use it after restart.
> Please let us know if this is already solved problem / some cleaner way of 
> achieving this objective



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398074#comment-16398074
 ] 

ASF GitHub Bot commented on KAFKA-6649:
---

huxihx opened a new pull request #4707: KAFKA-6649: Should catch 
OutOfRangeException for ReplicaFetcherThread
URL: https://github.com/apache/kafka/pull/4707
 
 
   https://issues.apache.org/jira/browse/KAFKA-6649
   
   `AbstractFetcherThread.processFetchRequest` should catch 
OffsetOutOfRangeException lest the thread was forcibly stopped.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> 

[jira] [Commented] (KAFKA-6500) Can not build aggregatedJavadoc

2018-03-13 Thread Jimin Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398033#comment-16398033
 ] 

Jimin Hsieh commented on KAFKA-6500:


Could you specify more detail? Which version or commit of Kafka you try to 
build? I tried
{code:bash}
./gradlew aggregatedJavadoc
{code}
with openjdk 7 & 8 and gradle 4.6 without any issues.

> Can not build aggregatedJavadoc
> ---
>
> Key: KAFKA-6500
> URL: https://issues.apache.org/jira/browse/KAFKA-6500
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Pavel Timofeev
>Priority: Minor
>
> I'm tying to build kafka according to instruction provided on github - 
> https://github.com/apache/kafka
> I followed the instruction and managed to build a jar with
> {code}
> ./gradlew jar
> {code}
> But I'm unable to build aggregated javadoc with
> {code}
> ./gradlew aggregatedJavadoc
> {code}
> Neither with openjdk 1.7 nor 1.8.
> It tells me
> {noformat}
> FAILURE: Build failed with an exception.
> * What went wrong:
> A problem was found with the configuration of task ':aggregatedJavadoc'.
> > No value has been specified for property 'outputDirectory'.
> {noformat}
> Is the instruction missing some steps?
> Is it a bug in build process?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6651) SchemaBuilder should not allow Arrays or Maps to be created by type()

2018-03-13 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397962#comment-16397962
 ] 

huxihx commented on KAFKA-6651:
---

Instead of throwing exceptions, we could possibly add a couple of `setters` for 
`keySchema` and `valueSchema`. 

> SchemaBuilder should not allow Arrays or Maps to be created by type()
> -
>
> Key: KAFKA-6651
> URL: https://issues.apache.org/jira/browse/KAFKA-6651
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Priority: Minor
>
> The following code should throw an exception because we cannot set 
> valueSchema() or keySchema() once the builder is returned. 
> {code:java}
> SchemaBuilder.type(Schema.Type.ARRAY);
> SchemaBuilder.type(Schema.Type.MAP);{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state

2018-03-13 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6652:
-

 Summary: The controller should log failed attempts to transition a 
replica to OfflineReplica state
 Key: KAFKA-6652
 URL: https://issues.apache.org/jira/browse/KAFKA-6652
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang


In certain conditions, the controller's attempt to transition a replica to 
OfflineReplica state could fail, e.g. the condition described in 
[KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that 
happens, there should be logs to indicate the failed state transitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397789#comment-16397789
 ] 

ASF GitHub Bot commented on KAFKA-6052:
---

vahidhashemian opened a new pull request #4705: KAFKA-6052: Fix the request 
retry issue (on Windows) in InterBrokerSendThread
URL: https://github.com/apache/kafka/pull/4705
 
 
   This resolves the issue detected on Windows. It's a follow-up to the 
investigation done by @hachikuji (details on the JIRA).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Windows: Consumers not polling when isolation.level=read_committed 
> ---
>
> Key: KAFKA-6052
> URL: https://issues.apache.org/jira/browse/KAFKA-6052
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, producer 
>Affects Versions: 0.11.0.0
> Environment: Windows 10. All processes running in embedded mode.
>Reporter: Ansel Zandegran
>Priority: Major
>  Labels: windows
> Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, 
> logFile.log
>
>
> *The same code is running fine in Linux.* I am trying to send a transactional 
> record with exactly once schematics. These are my producer, consumer and 
> broker setups. 
> public void sendWithTTemp(String topic, EHEvent event) {
> Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>   "localhost:9092,localhost:9093,localhost:9094");
> //props.put("bootstrap.servers", 
> "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092");
> props.put(ProducerConfig.ACKS_CONFIG, "all");
> props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
> props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
> props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
> props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
> props.put("transactional.id", "TID" + transactionId.incrementAndGet());
> props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000");
> Producer producer =
> new KafkaProducer<>(props,
> new StringSerializer(),
> new StringSerializer());
> Logger.log(this, "Initializing transaction...");
> producer.initTransactions();
> Logger.log(this, "Initializing done.");
> try {
>   Logger.log(this, "Begin transaction...");
>   producer.beginTransaction();
>   Logger.log(this, "Begin transaction done.");
>   Logger.log(this, "Sending events...");
>   producer.send(new ProducerRecord<>(topic,
>  event.getKey().toString(),
>  event.getValue().toString()));
>   Logger.log(this, "Sending events done.");
>   Logger.log(this, "Committing...");
>   producer.commitTransaction();
>   Logger.log(this, "Committing done.");
> } catch (ProducerFencedException | OutOfOrderSequenceException
> | AuthorizationException e) {
>   producer.close();
>   e.printStackTrace();
> } catch (KafkaException e) {
>   producer.abortTransaction();
>   e.printStackTrace();
> }
> producer.close();
>   }
> *In Consumer*
> I have set isolation.level=read_committed
> *In 3 Brokers*
> I'm running with the following properties
>   Properties props = new Properties();
>   props.setProperty("broker.id", "" + i);
>   props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i));
>   props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + 
> i);
>   props.setProperty("num.partitions", "1");
>   props.setProperty("zookeeper.connect", "localhost:2181");
>   props.setProperty("zookeeper.connection.timeout.ms", "6000");
>   props.setProperty("min.insync.replicas", "2");
>   props.setProperty("offsets.topic.replication.factor", "2");
>   props.setProperty("offsets.topic.num.partitions", "1");
>   props.setProperty("transaction.state.log.num.partitions", "2");
>   props.setProperty("transaction.state.log.replication.factor", "2");
>   props.setProperty("transaction.state.log.min.isr", "2");
> I am not getting any records in the consumer. When I set 
> isolation.level=read_uncommitted, I get the records. I assume that the 
> records are not getting 

[jira] [Created] (KAFKA-6651) SchemaBuilder should not allow Arrays or Maps to be created by type()

2018-03-13 Thread Jeremy Custenborder (JIRA)
Jeremy Custenborder created KAFKA-6651:
--

 Summary: SchemaBuilder should not allow Arrays or Maps to be 
created by type()
 Key: KAFKA-6651
 URL: https://issues.apache.org/jira/browse/KAFKA-6651
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Jeremy Custenborder


The following code should throw an exception because we cannot set 
valueSchema() or keySchema() once the builder is returned. 
{code:java}
SchemaBuilder.type(Schema.Type.ARRAY);
SchemaBuilder.type(Schema.Type.MAP);{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6650) The controller should be able to handle a partially deleted topic

2018-03-13 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6650:
-

 Summary: The controller should be able to handle a partially 
deleted topic
 Key: KAFKA-6650
 URL: https://issues.apache.org/jira/browse/KAFKA-6650
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang
Assignee: Lucas Wang


A previous controller could have deleted some partitions of a topic from ZK, 
but not all partitions, and then died.
In that case, the new controller should be able to handle the partially deleted 
topic, and finish the deletion.

In the current code base, if there is no leadership info for a replica's 
partition, the transition to OfflineReplica state for the replica will fail. 
Afterwards the transition to ReplicaDeletionStarted will fail as well since the 
only valid previous state for ReplicaDeletionStarted is OfflineReplica. 
Furthermore, it means the topic deletion will never finish.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-03-13 Thread Julio Ng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julio Ng updated KAFKA-6649:

Affects Version/s: 1.0.0
   0.11.0.2
   1.0.1

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-03-13 Thread Julio Ng (JIRA)
Julio Ng created KAFKA-6649:
---

 Summary: ReplicaFetcher stopped after non fatal exception is thrown
 Key: KAFKA-6649
 URL: https://issues.apache.org/jira/browse/KAFKA-6649
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 1.1.0
Reporter: Julio Ng


We have seen several under-replication partitions, usually triggered by topic 
creation. After digging in the logs, we see the below:
{noformat}
[2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
kafka.common.KafkaException: Error processing data for partition 
[[TOPIC_NAME_REMOVED]]-84 offset 2098535
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
 at scala.Option.foreach(Option.scala:257)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
increment the log start offset to 2098535 of partition 
[[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
[2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
It looks like that after the ReplicaFetcherThread is stopped, the replicas 
start to lag behind, presumably because we are not fetching from the leader 
anymore. Further examining, the ShutdownableThread.scala object:
{noformat}
override def run(): Unit = {
 info("Starting")
 try {
   while (isRunning)
 doWork()
 } catch {
   case e: FatalExitError =>
 shutdownInitiated.countDown()
 shutdownComplete.countDown()
 info("Stopped")
 Exit.exit(e.statusCode())
   case e: Throwable =>
 if (isRunning)
   error("Error due to", e)
 } finally {
   shutdownComplete.countDown()
 }
 info("Stopped")
}{noformat}
For the Throwable (non-fatal) case, it just exits the while loop and the thread 
stops doing work. I am not sure whether this is the intended behavior of the 
ShutdownableThread, or the exception should be caught and we should keep 
calling doWork()

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean

2018-03-13 Thread George Bloggs (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397437#comment-16397437
 ] 

George Bloggs commented on KAFKA-6647:
--

Hi. It does remove the lock file in the finally block but this is too late as 
the attempt to delete the directory has failed with the exception as the lock 
file exists in the directory it’s trying to delete. 

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean
> 
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Priority: Minor
>  Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397429#comment-16397429
 ] 

ASF GitHub Bot commented on KAFKA-6647:
---

tedyu opened a new pull request #4702: KAFKA-6647 KafkaStreams.cleanUp creates 
.lock file in directory it tries to clean
URL: https://github.com/apache/kafka/pull/4702
 
 
   Specify StandardOpenOption#DELETE_ON_CLOSE when creating the FileChannel.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaStreams.cleanUp creates .lock file in directory its trying to clean
> 
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Priority: Minor
>  Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop

2018-03-13 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5697:
---
Description: 
In {{StreamThread.shutdown()}} we currently do nothing but set the state, 
hoping the stream thread may eventually check it and shutdown itself. However, 
under certain scenarios the thread may get blocked within a single loop and 
hence will never check on this state enum. For example, it's {{consumer.poll}} 
call trigger {{ensureCoordinatorReady()}} which will block until the 
coordinator can be found. If the coordinator broker is never up and running 
then the Stream instance will be blocked forever.

A simple way to produce this issue is to start the work count demo without 
starting the ZK / Kafka broker, and then it will get stuck in a single loop and 
even `ctrl-C` will not stop it since its set state will never be read by the 
thread:
{code:java}
[2017-08-03 15:17:39,981] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,046] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,101] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,206] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,261] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,366] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,472] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient)
{code}
 

  was:
In {{StreamThread.close()}} we currently do nothing but set the state, hoping 
the stream thread may eventually check it and shutdown itself. However, under 
certain scenarios the thread may get blocked within a single loop and hence 
will never check on this state enum. For example, it's {{consumer.poll}} call 
trigger {{ensureCoordinatorReady()}} which will block until the coordinator can 
be found. If the coordinator broker is never up and running then the Stream 
instance will be blocked forever.

A simple way to produce this issue is to start the work count demo without 
starting the ZK / Kafka broker, and then it will get stuck in a single loop and 
even `ctrl-C` will not stop it since its set state will never be read by the 
thread:
{code:java}
[2017-08-03 15:17:39,981] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,046] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,101] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,206] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,261] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,366] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,472] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient)
{code}
 


> StreamThread.shutdown() need to interrupt the stream threads to break the loop
> --
>
> Key: KAFKA-5697
> URL: https://issues.apache.org/jira/browse/KAFKA-5697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: John Roesler
>Priority: Major
>  Labels: newbie
>
> In {{StreamThread.shutdown()}} we currently do nothing but set the state, 
> hoping the stream thread may eventually check it and shutdown itself. 
> However, under certain scenarios the thread may get blocked within a single 
> loop and hence will never check on this state enum. For example, it's 
> {{consumer.poll}} call 

[jira] [Assigned] (KAFKA-6376) Improve Streams metrics for skipped records

2018-03-13 Thread John Roesler (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-6376:
---

Assignee: John Roesler

> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop

2018-03-13 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5697:
---
Summary: StreamThread.shutdown() need to interrupt the stream threads to 
break the loop  (was: StreamThread.close() need to interrupt the stream threads 
to break the loop)

> StreamThread.shutdown() need to interrupt the stream threads to break the loop
> --
>
> Key: KAFKA-5697
> URL: https://issues.apache.org/jira/browse/KAFKA-5697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: John Roesler
>Priority: Major
>  Labels: newbie
>
> In {{StreamThread.close()}} we currently do nothing but set the state, hoping 
> the stream thread may eventually check it and shutdown itself. However, under 
> certain scenarios the thread may get blocked within a single loop and hence 
> will never check on this state enum. For example, it's {{consumer.poll}} call 
> trigger {{ensureCoordinatorReady()}} which will block until the coordinator 
> can be found. If the coordinator broker is never up and running then the 
> Stream instance will be blocked forever.
> A simple way to produce this issue is to start the work count demo without 
> starting the ZK / Kafka broker, and then it will get stuck in a single loop 
> and even `ctrl-C` will not stop it since its set state will never be read by 
> the thread:
> {code:java}
> [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5697) StreamThread.close() need to interrupt the stream threads to break the loop

2018-03-13 Thread John Roesler (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-5697:
---

Assignee: John Roesler

> StreamThread.close() need to interrupt the stream threads to break the loop
> ---
>
> Key: KAFKA-5697
> URL: https://issues.apache.org/jira/browse/KAFKA-5697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: John Roesler
>Priority: Major
>  Labels: newbie
>
> In {{StreamThread.close()}} we currently do nothing but set the state, hoping 
> the stream thread may eventually check it and shutdown itself. However, under 
> certain scenarios the thread may get blocked within a single loop and hence 
> will never check on this state enum. For example, it's {{consumer.poll}} call 
> trigger {{ensureCoordinatorReady()}} which will block until the coordinator 
> can be found. If the coordinator broker is never up and running then the 
> Stream instance will be blocked forever.
> A simple way to produce this issue is to start the work count demo without 
> starting the ZK / Kafka broker, and then it will get stuck in a single loop 
> and even `ctrl-C` will not stop it since its set state will never be read by 
> the thread:
> {code:java}
> [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-4730) Streams does not have an in-memory windowed store

2018-03-13 Thread John Roesler (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-4730:
---

Assignee: John Roesler  (was: Nikki Thean)

> Streams does not have an in-memory windowed store
> -
>
> Key: KAFKA-4730
> URL: https://issues.apache.org/jira/browse/KAFKA-4730
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: John Roesler
>Priority: Major
>
> Streams has windowed persistent stores (e.g., see PersistentKeyValueFactory 
> interface with "windowed" method), however it does not allow for windowed 
> in-memory stores (e.g., see InMemoryKeyValueFactory interface). 
> In addition to the interface not allowing it, streams does not actually have 
> an implementation of an in-memory windowed store.
> The implications are that operations that require windowing cannot use 
> in-memory stores. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean

2018-03-13 Thread George Bloggs (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

George Bloggs updated KAFKA-6647:
-
Environment: 
windows 10.
java version "1.8.0_162"
Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
org.apache.kafka:kafka-streams:1.0.1
Kafka commitId : c0518aa65f25317e

  was:
windows 10.
java version "1.8.0_162"
Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
org.apache.kafka:kafka-streams:1.0.1


> KafkaStreams.cleanUp creates .lock file in directory its trying to clean
> 
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Priority: Minor
>  Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean

2018-03-13 Thread George Bloggs (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

George Bloggs updated KAFKA-6647:
-
Description: 
When calling kafkaStreams.cleanUp() before starting a stream the 
StateDirectory.cleanRemovedTasks() method contains this check:
{code:java}
... Line 240
  if (lock(id, 0)) {
long now = time.milliseconds();
long lastModifiedMs = taskDir.lastModified();
if (now > lastModifiedMs + cleanupDelayMs) {
log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, 
id, now - lastModifiedMs, cleanupDelayMs);
Utils.delete(taskDir);
}
}
{code}
The check for lock(id,0) will create a .lock file in the directory that 
subsequently is going to be deleted. If the .lock file already exists from a 
previous run the attempt to delete the .lock file fails with 
AccessDeniedException.

This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
then attempt to remove the taskDir path calling Files.delete(path).

The call to files.delete(path) in postVisitDirectory will then fail 
java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
.lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory
   : stream-thread [restartedMain] Failed to lock the state directory due to an 
unexpected exception)

This seems to then cause issues using streams from a topic to an inMemory store.

  was:
When calling kafkaStreams.cleanUp() before starting a stream the 
StateDirectory.cleanRemovedTasks() method contains this check:
{code:java}
... Line 240
  if (lock(id, 0)) {
long now = time.milliseconds();
long lastModifiedMs = taskDir.lastModified();
if (now > lastModifiedMs + cleanupDelayMs) {
log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, 
id, now - lastModifiedMs, cleanupDelayMs);
Utils.delete(taskDir);
}
}
{code}
The check for lock(id,0) will create a .lock file in the directory that 
subsequently is going to be deleted. If the .lock file already exists from a 
previous run the attempt to delete the .lock file fails with 
AccessDeniedException.

This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
then attempt to remove the taskDir path calling Files.delete(path).

The call to files.delete(path) in postVisitDirectory will then fail 
java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
.lock file left the directory not empty.

This seems to then cause issues using streams from a topic to an in memory 
store.


> KafkaStreams.cleanUp creates .lock file in directory its trying to clean
> 
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
>Reporter: George Bloggs
>Priority: Minor
>  Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> 

[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean

2018-03-13 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397366#comment-16397366
 ] 

Ted Yu commented on KAFKA-6647:
---

The above code is paired with:
{code}
} finally {
try {
unlock(id);
{code}
where unlock() has:
{code}
lockAndOwner.lock.release();
log.debug("{} Released state dir lock for task {}", logPrefix(), 
taskId);
{code}
Wouldn't the above get rid of the lock file ?

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean
> 
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
>Reporter: George Bloggs
>Priority: Minor
>  Labels: streams
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty.
> This seems to then cause issues using streams from a topic to an in memory 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6644) Make Server Info more generic for Kafka Interactive Queries

2018-03-13 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6644:
---
Description: 
when working to implement *kafka streams interactive queries*, i see that i can 
set `application.server` with `host:port`

*i would like a more generic mechanism to set additional properties.*

i'm using cloud foundry containers for my kafka streams app. i scale out my 
containers using `*cf scale*`. each gets its own instance id. the *instance id* 
can be used in an http header to get cloud foundry to route the http to the 
correct instance

[https://docs.cloudfoundry.org/concepts/http-routing.html#app-instance-routing]

i realize, per Matthias J Sax, that Kafka Streams only distributes the 
information but does not use it. Thus, i can put the instance-id as the port.

Changing the config or add a new one is a public API change are requires a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

  was:
when working to implement *kafka streams interactive queries*, i see that i can 
set `application.server` with `host:port`

*i would like a more generic mechanism to set additional properties.*

i'm using cloud foundry containers for my kafka streams app. i scale out my 
containers using `*cf scale*`. each gets its own instance id. the *instance id* 
can be used in an http header to get cloud foundry to route the http to the 
correct instance

https://docs.cloudfoundry.org/concepts/http-routing.html#app-instance-routing


i realize, per Matthias J Sax, that Kafka Streams only distributes the 
information but does not use it. Thus, i can put the instance-id as the port.


> Make Server Info more generic for Kafka Interactive Queries
> ---
>
> Key: KAFKA-6644
> URL: https://issues.apache.org/jira/browse/KAFKA-6644
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, streams
>Reporter: Mike Graham
>Priority: Minor
>  Labels: needs-kip, newbie
>
> when working to implement *kafka streams interactive queries*, i see that i 
> can set `application.server` with `host:port`
> *i would like a more generic mechanism to set additional properties.*
> i'm using cloud foundry containers for my kafka streams app. i scale out my 
> containers using `*cf scale*`. each gets its own instance id. the *instance 
> id* can be used in an http header to get cloud foundry to route the http to 
> the correct instance
> [https://docs.cloudfoundry.org/concepts/http-routing.html#app-instance-routing]
> i realize, per Matthias J Sax, that Kafka Streams only distributes the 
> information but does not use it. Thus, i can put the instance-id as the port.
> Changing the config or add a new one is a public API change are requires a 
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6644) Make Server Info more generic for Kafka Interactive Queries

2018-03-13 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6644:
---
Labels: needs-kip newbie  (was: )

> Make Server Info more generic for Kafka Interactive Queries
> ---
>
> Key: KAFKA-6644
> URL: https://issues.apache.org/jira/browse/KAFKA-6644
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, streams
>Reporter: Mike Graham
>Priority: Minor
>  Labels: needs-kip, newbie
>
> when working to implement *kafka streams interactive queries*, i see that i 
> can set `application.server` with `host:port`
> *i would like a more generic mechanism to set additional properties.*
> i'm using cloud foundry containers for my kafka streams app. i scale out my 
> containers using `*cf scale*`. each gets its own instance id. the *instance 
> id* can be used in an http header to get cloud foundry to route the http to 
> the correct instance
> https://docs.cloudfoundry.org/concepts/http-routing.html#app-instance-routing
> i realize, per Matthias J Sax, that Kafka Streams only distributes the 
> information but does not use it. Thus, i can put the instance-id as the port.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5697) StreamThread.close() need to interrupt the stream threads to break the loop

2018-03-13 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397347#comment-16397347
 ] 

Ted Yu commented on KAFKA-5697:
---

Looks like StreamThread.close() doesn't exist (after refactoring).


> StreamThread.close() need to interrupt the stream threads to break the loop
> ---
>
> Key: KAFKA-5697
> URL: https://issues.apache.org/jira/browse/KAFKA-5697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>
> In {{StreamThread.close()}} we currently do nothing but set the state, hoping 
> the stream thread may eventually check it and shutdown itself. However, under 
> certain scenarios the thread may get blocked within a single loop and hence 
> will never check on this state enum. For example, it's {{consumer.poll}} call 
> trigger {{ensureCoordinatorReady()}} which will block until the coordinator 
> can be found. If the coordinator broker is never up and running then the 
> Stream instance will be blocked forever.
> A simple way to produce this issue is to start the work count demo without 
> starting the ZK / Kafka broker, and then it will get stuck in a single loop 
> and even `ctrl-C` will not stop it since its set state will never be read by 
> the thread:
> {code:java}
> [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5697) StreamThread.close() need to interrupt the stream threads to break the loop

2018-03-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5697:
-
Labels: newbie  (was: )

> StreamThread.close() need to interrupt the stream threads to break the loop
> ---
>
> Key: KAFKA-5697
> URL: https://issues.apache.org/jira/browse/KAFKA-5697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>
> In {{StreamThread.close()}} we currently do nothing but set the state, hoping 
> the stream thread may eventually check it and shutdown itself. However, under 
> certain scenarios the thread may get blocked within a single loop and hence 
> will never check on this state enum. For example, it's {{consumer.poll}} call 
> trigger {{ensureCoordinatorReady()}} which will block until the coordinator 
> can be found. If the coordinator broker is never up and running then the 
> Stream instance will be blocked forever.
> A simple way to produce this issue is to start the work count demo without 
> starting the ZK / Kafka broker, and then it will get stuck in a single loop 
> and even `ctrl-C` will not stop it since its set state will never be read by 
> the thread:
> {code:java}
> [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-03-13 Thread radai rosenblatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397306#comment-16397306
 ] 

radai rosenblatt commented on KAFKA-6648:
-

PR is https://github.com/apache/kafka/pull/4679

> Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
> -
>
> Key: KAFKA-6648
> URL: https://issues.apache.org/jira/browse/KAFKA-6648
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.0.0
>
>
> {code}
> if (!shouldRetry) {
>HashMap topicsPartitionInfos = new 
> HashMap<>();
>for (String topic : cluster.topics())
>   topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-03-13 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-6648:

Fix Version/s: 2.0.0

> Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
> -
>
> Key: KAFKA-6648
> URL: https://issues.apache.org/jira/browse/KAFKA-6648
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.0.0
>
>
> {code}
> if (!shouldRetry) {
>HashMap topicsPartitionInfos = new 
> HashMap<>();
>for (String topic : cluster.topics())
>   topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-03-13 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-6648:

Affects Version/s: (was: 1.0.1)
   1.0.0
   0.11.0.2

> Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
> -
>
> Key: KAFKA-6648
> URL: https://issues.apache.org/jira/browse/KAFKA-6648
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.0.0
>
>
> {code}
> if (!shouldRetry) {
>HashMap topicsPartitionInfos = new 
> HashMap<>();
>for (String topic : cluster.topics())
>   topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-03-13 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-6648:
---

 Summary: Fetcher.getTopicMetadata() only returns "healthy" 
partitions, not all
 Key: KAFKA-6648
 URL: https://issues.apache.org/jira/browse/KAFKA-6648
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.0.1
Reporter: radai rosenblatt
Assignee: radai rosenblatt


{code}
if (!shouldRetry) {
   HashMap topicsPartitionInfos = new HashMap<>();
   for (String topic : cluster.topics())
  topicsPartitionInfos.put(topic, 
cluster.availablePartitionsForTopic(topic));
   return topicsPartitionInfos;
}
{code}

this leads to inconsistent behavior upstream, for example in 
KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
would be returned, whereas if MD doesnt exist (or has expired) a subset of 
partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean

2018-03-13 Thread George Bloggs (JIRA)
George Bloggs created KAFKA-6647:


 Summary: KafkaStreams.cleanUp creates .lock file in directory its 
trying to clean
 Key: KAFKA-6647
 URL: https://issues.apache.org/jira/browse/KAFKA-6647
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.1
 Environment: windows 10.
java version "1.8.0_162"
Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
org.apache.kafka:kafka-streams:1.0.1
Reporter: George Bloggs


When calling kafkaStreams.cleanUp() before starting a stream the 
StateDirectory.cleanRemovedTasks() method contains this check:
{code:java}
... Line 240
  if (lock(id, 0)) {
long now = time.milliseconds();
long lastModifiedMs = taskDir.lastModified();
if (now > lastModifiedMs + cleanupDelayMs) {
log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, 
id, now - lastModifiedMs, cleanupDelayMs);
Utils.delete(taskDir);
}
}
{code}
The check for lock(id,0) will create a .lock file in the directory that 
subsequently is going to be deleted. If the .lock file already exists from a 
previous run the attempt to delete the .lock file fails with 
AccessDeniedException.

This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
then attempt to remove the taskDir path calling Files.delete(path).

The call to files.delete(path) in postVisitDirectory will then fail 
java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
.lock file left the directory not empty.

This seems to then cause issues using streams from a topic to an in memory 
store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6646) Add a GlobalKStream object type for stream event broadcast

2018-03-13 Thread Antony Stubbs (JIRA)
Antony Stubbs created KAFKA-6646:


 Summary: Add a GlobalKStream object type for stream event broadcast
 Key: KAFKA-6646
 URL: https://issues.apache.org/jira/browse/KAFKA-6646
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Affects Versions: 1.1.0
Reporter: Antony Stubbs


There are some use cases where having a global KStream object is useful. For 
example, where a single event is sent, with very low frequency, to a cluster of 
Kafka stream nodes to trigger all nodes to do some processing of state stored 
on their instance.

Workaround currently is to either create a second kstream app instance, being 
careful to configure it with a different state dir, and give it a unique app 
name per instance, then create a kstream in each one. Or - you can use the 
normal consumer client inside your kstream app with unique consumer groups.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4027) Leader for a cetain partition unavailable forever

2018-03-13 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397208#comment-16397208
 ] 

Manikumar commented on KAFKA-4027:
--

More details about similar exception are here: 
https://issues.apache.org/jira/browse/KAFKA-5758

> Leader for a cetain partition unavailable forever
> -
>
> Key: KAFKA-4027
> URL: https://issues.apache.org/jira/browse/KAFKA-4027
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: tu nguyen khac
>Priority: Major
>
> I have a cluster of brokers ( 9 box) , i 'm naming it from 0 --> 8 . 
> Yesterday some servers went down ( hard reset ) i regularly restart these 
> server ( down servers ) but after that some topics cannot assign leader 
> i checked server log and retrieved these logging : 
> kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
> 6's position -1 since the replica is not recognized to be one of the assigned 
> replicas 1 for partition [tos_htv3tv.com,31].
>   at 
> kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:251)
>   at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:864)
>   at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:861)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>   at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:861)
>   at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:470)
>   at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:496)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:77)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> i tried to run Prefered Leader but it didn't work ( some partitions has node 
> leader ) :(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6645) Host Affinity to facilitate faster restarts of kafka streams applications

2018-03-13 Thread Giridhar Addepalli (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Giridhar Addepalli updated KAFKA-6645:
--
Summary: Host Affinity to facilitate faster restarts of kafka streams 
applications  (was: Sticky Partition Assignment to facilitate faster restarts 
of kafka streams applications)

> Host Affinity to facilitate faster restarts of kafka streams applications
> -
>
> Key: KAFKA-6645
> URL: https://issues.apache.org/jira/browse/KAFKA-6645
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Giridhar Addepalli
>Priority: Major
>
> Since Kafka Streams applications have lot of state in the stores in general, 
> it would be good to remember the assignment of partitions to machines. So 
> that when whole application is restarted for whatever reason, there is a way 
> to use past assignment of partitions to machines and there won't be need to 
> build up state by reading off of changelog kafka topic and would result in 
> faster start-up.
> Samza has support for Host Affinity 
> ([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html])
> KIP-54 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)]
>  , handles cases where some members of consumer group goes down / comes up, 
> and KIP-54 ensures there is minimal diff between assignments before and after 
> rebalance. 
> But to handle whole restart use case, we need to remember past assignment 
> somewhere, and use it after restart.
> Please let us know if this is already solved problem / some cleaner way of 
> achieving this objective



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6645) Sticky Partition Assignment to facilitate faster restarts of kafka streams applications

2018-03-13 Thread Giridhar Addepalli (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Giridhar Addepalli updated KAFKA-6645:
--
Description: 
Since Kafka Streams applications have lot of state in the stores in general, it 
would be good to remember the assignment of partitions to machines. So that 
when whole application is restarted for whatever reason, there is a way to use 
past assignment of partitions to machines and there won't be need to build up 
state by reading off of changelog kafka topic and would result in faster 
start-up.

Samza has support for Host Affinity 
([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html])

KIP-54 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)]
 , handles cases where some members of consumer group goes down / comes up, and 
KIP-54 ensures there is minimal diff between assignments before and after 
rebalance. 

But to handle whole restart use case, we need to remember past assignment 
somewhere, and use it after restart.

Please let us know if this is already solved problem / some cleaner way of 
achieving this objective

  was:
Since Kafka Streams applications have lot of state in the stores in the 
general, it would be good to remember the assignment of partitions to machines. 
So that when whole application is restarted for whatever reason, there is a way 
to use past assignment of partitions to machines and there won't be need to 
build up state by reading off of changelog kafka topic and would result in 
faster start-up.

Samza has support for Host Affinity 
(https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html)

KIP-54 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)]
 , handles cases where some members of consumer group goes down / comes up, and 
KIP-54 ensures there is minimal diff between assignments before and after 
rebalance. 

But to handle whole restart use case, we need to remember past assignment 
somewhere, and use it after restart.

Please let us know if this is already solved problem / some cleaner way of 
achieving this objective


> Sticky Partition Assignment to facilitate faster restarts of kafka streams 
> applications
> ---
>
> Key: KAFKA-6645
> URL: https://issues.apache.org/jira/browse/KAFKA-6645
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Giridhar Addepalli
>Priority: Major
>
> Since Kafka Streams applications have lot of state in the stores in general, 
> it would be good to remember the assignment of partitions to machines. So 
> that when whole application is restarted for whatever reason, there is a way 
> to use past assignment of partitions to machines and there won't be need to 
> build up state by reading off of changelog kafka topic and would result in 
> faster start-up.
> Samza has support for Host Affinity 
> ([https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html])
> KIP-54 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)]
>  , handles cases where some members of consumer group goes down / comes up, 
> and KIP-54 ensures there is minimal diff between assignments before and after 
> rebalance. 
> But to handle whole restart use case, we need to remember past assignment 
> somewhere, and use it after restart.
> Please let us know if this is already solved problem / some cleaner way of 
> achieving this objective



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6634) Delay initiating the txn on producers until initializeTopology with EOS turned on

2018-03-13 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6634.
--
   Resolution: Fixed
Fix Version/s: 1.1.0

> Delay initiating the txn on producers until initializeTopology with EOS 
> turned on
> -
>
> Key: KAFKA-6634
> URL: https://issues.apache.org/jira/browse/KAFKA-6634
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 1.1.0
>
>
> In Streams EOS implementation, the created producers for tasks will initiate 
> a txn immediately after being created in the constructor of `StreamTask`. 
> However, the task may not process any data and hence producer may not send 
> any records for that started txn for a long time because of the restoration 
> process. And with default txn.session.timeout valued at 60 seconds, it means 
> that if the restoration takes more than that amount of time, upon starting 
> the producer will immediately get the error that its producer epoch is 
> already old.
> To fix this, we should consider instantiating the txn only after the 
> restoration phase is done. Although this may have a caveat that if the 
> producer is already fenced, it will not be notified until then, in 
> initializeTopology. But I think this should not be a correctness issue since 
> during the restoration process we do not make any changes to the processing 
> state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6634) Delay initiating the txn on producers until initializeTopology with EOS turned on

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397130#comment-16397130
 ] 

ASF GitHub Bot commented on KAFKA-6634:
---

guozhangwang closed pull request #4684: KAFKA-6634: Delay starting new 
transaction in task.initializeTopology
URL: https://github.com/apache/kafka/pull/4684
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 8529c9eca88..c806bfde47e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -90,6 +90,7 @@ void addNewTask(final T task) {
  * @return partitions that are ready to be resumed
  * @throws IllegalStateException If store gets registered after 
initialized is already finished
  * @throws StreamsException if the store's change log does not contain the 
partition
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
  */
 Set initializeNewTasks() {
 final Set readyPartitions = new HashSet<>();
@@ -240,18 +241,21 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, 
final Set
 log.trace("found suspended {} {}", taskTypeName, taskId);
 if (task.partitions().equals(partitions)) {
 suspended.remove(taskId);
+task.resume();
 try {
-task.resume();
+transitionToRunning(task, new HashSet());
 } catch (final TaskMigratedException e) {
+// we need to catch migration exception internally since 
this function
+// is triggered in the rebalance callback
 log.info("Failed to resume {} {} since it got migrated to 
another thread already. " +
 "Closing it as zombie before triggering a new 
rebalance.", taskTypeName, task.id());
 final RuntimeException fatalException = 
closeZombieTask(task);
+running.remove(task.id());
 if (fatalException != null) {
 throw fatalException;
 }
 throw e;
 }
-transitionToRunning(task, new HashSet());
 log.trace("resuming suspended {} {}", taskTypeName, task.id());
 return true;
 } else {
@@ -271,6 +275,9 @@ private void addToRestoring(final T task) {
 }
 }
 
+/**
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ */
 private void transitionToRunning(final T task, final Set 
readyPartitions) {
 log.debug("transitioning {} {} to running", taskTypeName, task.id());
 running.put(task.id(), task);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b8777ad5521..8d6e56a17aa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -100,7 +100,6 @@ void removeAllSensors() {
  * @param cache the {@link ThreadCache} created by the 
thread
  * @param time  the system {@link Time} of the thread
  * @param producer  the instance of {@link Producer} used to 
produce records
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
  */
 public StreamTask(final TaskId id,
   final Collection partitions,
@@ -149,14 +148,11 @@ public StreamTask(final TaskId id,
 partitionGroup = new PartitionGroup(partitionQueues);
 
 stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+// initialize transactions if eos is turned on, which will block if 
the previous transaction has not
+// completed yet; do not start the first transaction until the 
topology has been initialized later
 if (eosEnabled) {
-try {
-this.producer.initTransactions();
-this.producer.beginTransaction();
-} catch (final ProducerFencedException fatal) {
-throw new TaskMigratedException(this, fatal);
-}
-transactionInFlight = true;
+this.producer.initTransactions();
 }
 }
 
@@ 

[jira] [Commented] (KAFKA-4027) Leader for a cetain partition unavailable forever

2018-03-13 Thread Antonio Verardi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397022#comment-16397022
 ] 

Antonio Verardi commented on KAFKA-4027:


[~tuyuri] I think I am having the exact same issue and on the exact same 
version. Did you ever figured out what caused it?

> Leader for a cetain partition unavailable forever
> -
>
> Key: KAFKA-4027
> URL: https://issues.apache.org/jira/browse/KAFKA-4027
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: tu nguyen khac
>Priority: Major
>
> I have a cluster of brokers ( 9 box) , i 'm naming it from 0 --> 8 . 
> Yesterday some servers went down ( hard reset ) i regularly restart these 
> server ( down servers ) but after that some topics cannot assign leader 
> i checked server log and retrieved these logging : 
> kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
> 6's position -1 since the replica is not recognized to be one of the assigned 
> replicas 1 for partition [tos_htv3tv.com,31].
>   at 
> kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:251)
>   at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:864)
>   at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:861)
>   at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>   at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>   at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:861)
>   at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:470)
>   at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:496)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:77)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> i tried to run Prefered Leader but it didn't work ( some partitions has node 
> leader ) :(



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6645) Sticky Partition Assignment to facilitate faster restarts of kafka streams applications

2018-03-13 Thread Giridhar Addepalli (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Giridhar Addepalli updated KAFKA-6645:
--
Summary: Sticky Partition Assignment to facilitate faster restarts of kafka 
streams applications  (was: Sticky Partition Assignment across Kafka Streams 
application restarts)

> Sticky Partition Assignment to facilitate faster restarts of kafka streams 
> applications
> ---
>
> Key: KAFKA-6645
> URL: https://issues.apache.org/jira/browse/KAFKA-6645
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Giridhar Addepalli
>Priority: Major
>
> Since Kafka Streams applications have lot of state in the stores in the 
> general, it would be good to remember the assignment of partitions to 
> machines. So that when whole application is restarted for whatever reason, 
> there is a way to use past assignment of partitions to machines and there 
> won't be need to build up state by reading off of changelog kafka topic and 
> would result in faster start-up.
> Samza has support for Host Affinity 
> (https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html)
> KIP-54 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)]
>  , handles cases where some members of consumer group goes down / comes up, 
> and KIP-54 ensures there is minimal diff between assignments before and after 
> rebalance. 
> But to handle whole restart use case, we need to remember past assignment 
> somewhere, and use it after restart.
> Please let us know if this is already solved problem / some cleaner way of 
> achieving this objective



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6645) Sticky Partition Assignment across Kafka Streams application restarts

2018-03-13 Thread Giridhar Addepalli (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Giridhar Addepalli updated KAFKA-6645:
--
Issue Type: New Feature  (was: Bug)

> Sticky Partition Assignment across Kafka Streams application restarts
> -
>
> Key: KAFKA-6645
> URL: https://issues.apache.org/jira/browse/KAFKA-6645
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Giridhar Addepalli
>Priority: Major
>
> Since Kafka Streams applications have lot of state in the stores in the 
> general, it would be good to remember the assignment of partitions to 
> machines. So that when whole application is restarted for whatever reason, 
> there is a way to use past assignment of partitions to machines and there 
> won't be need to build up state by reading off of changelog kafka topic and 
> would result in faster start-up.
> Samza has support for Host Affinity 
> (https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html)
> KIP-54 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)]
>  , handles cases where some members of consumer group goes down / comes up, 
> and KIP-54 ensures there is minimal diff between assignments before and after 
> rebalance. 
> But to handle whole restart use case, we need to remember past assignment 
> somewhere, and use it after restart.
> Please let us know if this is already solved problem / some cleaner way of 
> achieving this objective



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6645) Sticky Partition Assignment across Kafka Streams application restarts

2018-03-13 Thread Giridhar Addepalli (JIRA)
Giridhar Addepalli created KAFKA-6645:
-

 Summary: Sticky Partition Assignment across Kafka Streams 
application restarts
 Key: KAFKA-6645
 URL: https://issues.apache.org/jira/browse/KAFKA-6645
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Giridhar Addepalli


Since Kafka Streams applications have lot of state in the stores in the 
general, it would be good to remember the assignment of partitions to machines. 
So that when whole application is restarted for whatever reason, there is a way 
to use past assignment of partitions to machines and there won't be need to 
build up state by reading off of changelog kafka topic and would result in 
faster start-up.

Samza has support for Host Affinity 
(https://samza.apache.org/learn/documentation/0.14/yarn/yarn-host-affinity.html)

KIP-54 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy)]
 , handles cases where some members of consumer group goes down / comes up, and 
KIP-54 ensures there is minimal diff between assignments before and after 
rebalance. 

But to handle whole restart use case, we need to remember past assignment 
somewhere, and use it after restart.

Please let us know if this is already solved problem / some cleaner way of 
achieving this objective



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6535) Set default retention ms for Streams repartition topics to Long.MAX_VALUE

2018-03-13 Thread Khaireddine Rezgui (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Khaireddine Rezgui reassigned KAFKA-6535:
-

Assignee: Khaireddine Rezgui

> Set default retention ms for Streams repartition topics to Long.MAX_VALUE
> -
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Khaireddine Rezgui
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.
> More specifically, in {{RepartitionTopicConfig}} we have a few default 
> overrides for repartition topic configs, we should just add the override for 
> {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still 
> allows users to override themselves if they want via 
> {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this 
> update takes effect.
> In addition to the code change, we also need to have doc changes in 
> streams/upgrade_guide.html specifying this default value change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6576) Configurable Quota Management (KIP-257)

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396741#comment-16396741
 ] 

ASF GitHub Bot commented on KAFKA-6576:
---

rajinisivaram opened a new pull request #4699: KAFKA-6576: Configurable Quota 
Management (KIP-257)
URL: https://github.com/apache/kafka/pull/4699
 
 
   Enable quota calculation to be customized using a configurable callback. See 
KIP-257 for details.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Configurable Quota Management (KIP-257)
> ---
>
> Key: KAFKA-6576
> URL: https://issues.apache.org/jira/browse/KAFKA-6576
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> See 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management]
>  for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6637) if set topic config segment.ms=0 Kafka broker won't be able to start

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396711#comment-16396711
 ] 

ASF GitHub Bot commented on KAFKA-6637:
---

huxihx opened a new pull request #4698: KAFKA-6637: Avoid divide /zero error 
with segment.ms set to zero
URL: https://github.com/apache/kafka/pull/4698
 
 
   https://issues.apache.org/jira/browse/KAFKA-6637
   
   Changes of this patch include:
   1. Avoid divide /zero error with segment.ms set to zero
   2. Ignore the effect of setting `log.roll.ms` to zero. Use `log.roll.hours` 
instead
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> if set topic config segment.ms=0 Kafka broker won't be able to start
> 
>
> Key: KAFKA-6637
> URL: https://issues.apache.org/jira/browse/KAFKA-6637
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Chong Wang
>Assignee: huxihx
>Priority: Major
>
> If set topic config segment.ms to 0, Kafka server won't be able to start 
> because of a FATAL error:
> [2018-03-12 19:05:40,196] FATAL [KafkaServer id=2] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) 
> java.lang.ArithmeticException: / by zero at 
> kafka.log.LogConfig.randomSegmentJitter(LogConfig.scala:100) at 
> kafka.log.Log.loadSegments(Log.scala:419) at 
> kafka.log.Log.(Log.scala:203) at kafka.log.Log$.apply(Log.scala:1734) 
> at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:221) 
> at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$8$$anonfun$apply$16$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:292)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> [https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/log/LogConfig.scala#L100]
> So the minimum value shouldn't be 0
> https://kafka.apache.org/documentation/#topicconfigs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6644) Make Server Info more generic for Kafka Interactive Queries

2018-03-13 Thread Mike Graham (JIRA)
Mike Graham created KAFKA-6644:
--

 Summary: Make Server Info more generic for Kafka Interactive 
Queries
 Key: KAFKA-6644
 URL: https://issues.apache.org/jira/browse/KAFKA-6644
 Project: Kafka
  Issue Type: Improvement
  Components: config, streams
Reporter: Mike Graham


when working to implement *kafka streams interactive queries*, i see that i can 
set `application.server` with `host:port`

*i would like a more generic mechanism to set additional properties.*

i'm using cloud foundry containers for my kafka streams app. i scale out my 
containers using `*cf scale*`. each gets its own instance id. the *instance id* 
can be used in an http header to get cloud foundry to route the http to the 
correct instance

https://docs.cloudfoundry.org/concepts/http-routing.html#app-instance-routing


i realize, per Matthias J Sax, that Kafka Streams only distributes the 
information but does not use it. Thus, i can put the instance-id as the port.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6195) DNS alias support for secured connections

2018-03-13 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396694#comment-16396694
 ] 

Rajini Sivaram commented on KAFKA-6195:
---

Looks good to me, I will respond on the discuss thread.

> DNS alias support for secured connections
> -
>
> Key: KAFKA-6195
> URL: https://issues.apache.org/jira/browse/KAFKA-6195
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Jonathan Skrzypek
>Priority: Major
>
> It seems clients can't use a dns alias in front of a secured Kafka cluster.
> So applications can only specify a list of hosts or IPs in bootstrap.servers 
> instead of an alias encompassing all cluster nodes.
> Using an alias in bootstrap.servers results in the following error : 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Fail to create credential. (63) - No service creds)]) 
> occurred when evaluating SASL token received from the Kafka Broker. Kafka 
> Client will go to AUTH_FAILED state. [Caused by 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Fail to create 
> credential. (63) - No service creds)]]
> When using SASL/Kerberos authentication, the kafka server principal is of the 
> form kafka@kafka/broker1.hostname@example.com
> Kerberos requires that the hosts can be resolved by their FQDNs.
> During SASL handshake, the client will create a SASL token and then send it 
> to kafka for auth.
> But to create a SASL token the client first needs to be able to validate that 
> the broker's kerberos is a valid one.
> There are 3 potential options :
> 1. Creating a single kerberos principal not linked to a host but to an alias 
> and reference it in the broker jaas file.
> But I think the kerberos infrastructure would refuse to validate it, so the 
> SASL handshake would still fail
> 2. Modify the client bootstrap mechanism to detect whether bootstrap.servers 
> contains a dns alias. If it does, resolve and expand the alias to retrieve 
> all hostnames behind it and add them to the list of nodes.
> This could be done by modifying parseAndValidateAddresses() in ClientUtils
> 3. Add a cluster.alias parameter that would be handled by the logic above. 
> Having another parameter to avoid confusion on how bootstrap.servers works 
> behind the scene.
> Thoughts ?
> I would be happy to contribute the change for any of the options.
> I believe the ability to use a dns alias instead of static lists of brokers 
> would bring good deployment flexibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6637) if set topic config segment.ms=0 Kafka broker won't be able to start

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396622#comment-16396622
 ] 

ASF GitHub Bot commented on KAFKA-6637:
---

huxihx closed pull request #4697: KAFKA-6637: Avoid divide by zero exception 
when segment.ms is set to 0
URL: https://github.com/apache/kafka/pull/4697
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index 30ca333dd28..2a941bc4196 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -98,7 +98,7 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
   val FollowerReplicationThrottledReplicas = 
getList(LogConfig.FollowerReplicationThrottledReplicasProp)
 
   def randomSegmentJitter: Long =
-if (segmentJitterMs == 0) 0 else Utils.abs(scala.util.Random.nextInt()) % 
math.min(segmentJitterMs, segmentMs)
+if (segmentJitterMs == 0 || segmentMs == 0) 0 else 
Utils.abs(scala.util.Random.nextInt()) % math.min(segmentJitterMs, segmentMs)
 }
 
 object LogConfig {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index cf22305caf7..e35d188e943 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1088,7 +1088,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   def logIndexSizeMaxBytes = getInt(KafkaConfig.LogIndexSizeMaxBytesProp)
   def logIndexIntervalBytes = getInt(KafkaConfig.LogIndexIntervalBytesProp)
   def logDeleteDelayMs = getLong(KafkaConfig.LogDeleteDelayMsProp)
-  def logRollTimeMillis: java.lang.Long = 
Option(getLong(KafkaConfig.LogRollTimeMillisProp)).getOrElse(60 * 60 * 1000L * 
getInt(KafkaConfig.LogRollTimeHoursProp))
+  def logRollTimeMillis: java.lang.Long = 
Option(getLong(KafkaConfig.LogRollTimeMillisProp)) match {
+  case Some(rollMs) if rollMs > 0 => rollMs
+  case _ => 60 * 60 * 1000L * getInt(KafkaConfig.LogRollTimeHoursProp)
+  }
   def logRollTimeJitterMillis: java.lang.Long = 
Option(getLong(KafkaConfig.LogRollTimeJitterMillisProp)).getOrElse(60 * 60 * 
1000L * getInt(KafkaConfig.LogRollTimeJitterHoursProp))
   def logFlushIntervalMs: java.lang.Long = 
Option(getLong(KafkaConfig.LogFlushIntervalMsProp)).getOrElse(getLong(KafkaConfig.LogFlushSchedulerIntervalMsProp))
   def minInSyncReplicas = getInt(KafkaConfig.MinInSyncReplicasProp)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> if set topic config segment.ms=0 Kafka broker won't be able to start
> 
>
> Key: KAFKA-6637
> URL: https://issues.apache.org/jira/browse/KAFKA-6637
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Chong Wang
>Assignee: huxihx
>Priority: Major
>
> If set topic config segment.ms to 0, Kafka server won't be able to start 
> because of a FATAL error:
> [2018-03-12 19:05:40,196] FATAL [KafkaServer id=2] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) 
> java.lang.ArithmeticException: / by zero at 
> kafka.log.LogConfig.randomSegmentJitter(LogConfig.scala:100) at 
> kafka.log.Log.loadSegments(Log.scala:419) at 
> kafka.log.Log.(Log.scala:203) at kafka.log.Log$.apply(Log.scala:1734) 
> at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:221) 
> at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$8$$anonfun$apply$16$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:292)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> [https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/log/LogConfig.scala#L100]
> So the minimum value shouldn't be 0
> https://kafka.apache.org/documentation/#topicconfigs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6637) if set topic config segment.ms=0 Kafka broker won't be able to start

2018-03-13 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-6637:
-

Assignee: huxihx

> if set topic config segment.ms=0 Kafka broker won't be able to start
> 
>
> Key: KAFKA-6637
> URL: https://issues.apache.org/jira/browse/KAFKA-6637
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Chong Wang
>Assignee: huxihx
>Priority: Major
>
> If set topic config segment.ms to 0, Kafka server won't be able to start 
> because of a FATAL error:
> [2018-03-12 19:05:40,196] FATAL [KafkaServer id=2] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) 
> java.lang.ArithmeticException: / by zero at 
> kafka.log.LogConfig.randomSegmentJitter(LogConfig.scala:100) at 
> kafka.log.Log.loadSegments(Log.scala:419) at 
> kafka.log.Log.(Log.scala:203) at kafka.log.Log$.apply(Log.scala:1734) 
> at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:221) 
> at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$8$$anonfun$apply$16$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:292)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> [https://github.com/apache/kafka/blob/1.0/core/src/main/scala/kafka/log/LogConfig.scala#L100]
> So the minimum value shouldn't be 0
> https://kafka.apache.org/documentation/#topicconfigs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6617) Improve controller performance by batching reassignment znode write operation

2018-03-13 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6617:

Summary: Improve controller performance by batching reassignment znode 
write operation  (was: KAFKA-6617; Improve controller performance by batching 
reassignment znode write operation)

> Improve controller performance by batching reassignment znode write operation
> -
>
> Key: KAFKA-6617
> URL: https://issues.apache.org/jira/browse/KAFKA-6617
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6617) KAFKA-6617; Improve controller performance by batching reassignment znode write operation

2018-03-13 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6617:

Summary: KAFKA-6617; Improve controller performance by batching 
reassignment znode write operation  (was: Improve controller performance by 
batching reassignment znode write operation)

> KAFKA-6617; Improve controller performance by batching reassignment znode 
> write operation
> -
>
> Key: KAFKA-6617
> URL: https://issues.apache.org/jira/browse/KAFKA-6617
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6555) Making state store queryable during restoration

2018-03-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396593#comment-16396593
 ] 

Matthias J. Sax commented on KAFKA-6555:


Thanks [~asurana]! Looking forward to your KIP!

> Making state store queryable during restoration
> ---
>
> Key: KAFKA-6555
> URL: https://issues.apache.org/jira/browse/KAFKA-6555
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even in the RESTORATION 
> (PARTITION_ASSIGNED) state as the time spend on restoration can be huge and 
> making the data inaccessible during this time could be downtime not suitable 
> for many applications.
> When the active partition goes down then one of the following occurs:
>  # One of the standby replica partition gets promoted to active: Replica task 
> has to restore the remaining state from the changelog topic before it can 
> become RUNNING. The time taken for this depends on how much the replica is 
> lagging behind. During this restoration time the state store for that 
> partition is currently not queryable resulting in the partition downtime. We 
> can make the state store partition queryable for the data already present in 
> the state store.
>  # When there is no replica or standby task, then active task will be started 
> in one of the existing node. That node has to build the entire state from the 
> changelog topic which can take lot of time depending on how big is the 
> changelog topic, and keeping state store not queryable during this time is 
> the downtime for the parition.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6024) Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396592#comment-16396592
 ] 

ASF GitHub Bot commented on KAFKA-6024:
---

hachikuji closed pull request #4617: KAFKA-6024 Move validation in 
KafkaConsumer ahead of acquireAndEnsure…
URL: https://github.com/apache/kafka/pull/4617
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3cd034eff76..81137f3c8dd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -966,13 +966,12 @@ public void subscribe(Collection topics) {
  */
 @Override
 public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+if (pattern == null)
+throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be null");
+
 acquireAndEnsureOpen();
 try {
-if (pattern == null)
-throw new IllegalArgumentException("Topic pattern to subscribe 
to cannot be null");
-
 throwIfNoAssignorsConfigured();
-
 log.debug("Subscribed to pattern: {}", pattern);
 this.subscriptions.subscribe(pattern, listener);
 this.metadata.needMetadataForAllTopics(true);
@@ -1337,11 +1336,11 @@ public void commitAsync(final Map offsets, Of
  */
 @Override
 public void seek(TopicPartition partition, long offset) {
+if (offset < 0)
+throw new IllegalArgumentException("seek offset must not be a 
negative number");
+
 acquireAndEnsureOpen();
 try {
-if (offset < 0)
-throw new IllegalArgumentException("seek offset must not be a 
negative number");
-
 log.debug("Seeking to offset {} for partition {}", offset, 
partition);
 this.subscriptions.seek(partition, offset);
 } finally {
@@ -1357,11 +1356,11 @@ public void seek(TopicPartition partition, long offset) 
{
  * @throws IllegalArgumentException if {@code partitions} is {@code null} 
or the provided TopicPartition is not assigned to this consumer
  */
 public void seekToBeginning(Collection partitions) {
+if (partitions == null)
+throw new IllegalArgumentException("Partitions collection cannot 
be null");
+
 acquireAndEnsureOpen();
 try {
-if (partitions == null) {
-throw new IllegalArgumentException("Partitions collection 
cannot be null");
-}
 Collection parts = partitions.size() == 0 ? 
this.subscriptions.assignedPartitions() : partitions;
 for (TopicPartition tp : parts) {
 log.debug("Seeking to beginning of partition {}", tp);
@@ -1383,11 +1382,11 @@ public void seekToBeginning(Collection 
partitions) {
  * @throws IllegalArgumentException if {@code partitions} is {@code null} 
or the provided TopicPartition is not assigned to this consumer
  */
 public void seekToEnd(Collection partitions) {
+if (partitions == null)
+throw new IllegalArgumentException("Partitions collection cannot 
be null");
+
 acquireAndEnsureOpen();
 try {
-if (partitions == null) {
-throw new IllegalArgumentException("Partitions collection 
cannot be null");
-}
 Collection parts = partitions.size() == 0 ? 
this.subscriptions.assignedPartitions() : partitions;
 for (TopicPartition tp : parts) {
 log.debug("Seeking to end of partition {}", tp);


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consider moving validation in KafkaConsumer ahead of call to 
> acquireAndEnsureOpen()
> ---
>
> Key: KAFKA-6024
> URL: https://issues.apache.org/jira/browse/KAFKA-6024
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: siva santhalingam
>Priority: Minor
> Fix For: 1.2.0
>
>
> In several methods, parameter validation is done after calling 
> acquireAndEnsureOpen() :
> {code}
> 

[jira] [Resolved] (KAFKA-6024) Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()

2018-03-13 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6024.

   Resolution: Fixed
Fix Version/s: 1.2.0

> Consider moving validation in KafkaConsumer ahead of call to 
> acquireAndEnsureOpen()
> ---
>
> Key: KAFKA-6024
> URL: https://issues.apache.org/jira/browse/KAFKA-6024
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: siva santhalingam
>Priority: Minor
> Fix For: 1.2.0
>
>
> In several methods, parameter validation is done after calling 
> acquireAndEnsureOpen() :
> {code}
> public void seek(TopicPartition partition, long offset) {
> acquireAndEnsureOpen();
> try {
> if (offset < 0)
> throw new IllegalArgumentException("seek offset must not be a 
> negative number");
> {code}
> Since the value of parameter would not change per invocation, it seems 
> performing validation ahead of acquireAndEnsureOpen() call would be better.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has LIMITED retention time

2018-03-13 Thread Navinder Brar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6643:
-
Description: 
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.

I have few proposals in that respect.
1. Use a new kafka topic related to each partition which we need to warm up on 
the fly(when node containing that partition crashes. Produce into this topic 
from another replica/active and built new replica through this topic.
2. Use peer to peer file transfer(such as SFTP) as rocksdb can create backups, 
which can be transferred from source node to destination node when a new 
replica has to be built from scratch.
3. Use HDFS in intermediate instead of kafka topic where we can keep scheduled 
backups for each partition and use those to build new replicas.

  was:
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.

I have few proposals in that respect.
1. Use a new kafka topic related to each partition which we need to warm up on 
the fly(when node containing that partition crashes. Produce into this topic 
from another replica/active and built new replica through this topic.
2. Use peer to peer file transfer as rocksdb can create backups, which can be 
transferred from source node to destination node when a new replica has to be 
built from scratch.
3. Use HDFS in intermediate instead of kafka topic where we can keep scheduled 
backups for each partition and use those to build new replicas.


> Warm up new replicas from scratch when changelog topic has LIMITED retention 
> time
> -
>
> Key: KAFKA-6643
> URL: https://issues.apache.org/jira/browse/KAFKA-6643
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> In the current scenario, Kafka Streams has changelog Kafka topics(internal 
> topics having all the data for the store) which are used to build the state 
> of replicas. So, if we keep the number of standby replicas as 1, we still 
> have more availability for persistent state stores as changelog Kafka topics 
> are also replicated depending upon broker replication policy but that also 
> means we are using at least 4 times the space(1 master store, 1 replica 
> store, 1 changelog, 1 changelog replica). 
> Now if we have an year's data in persistent stores(rocksdb), we don't want 
> the changelog topics to have an year's data as it will put an unnecessary 
> burden on brokers(in terms of space). If we have to scale our kafka streams 
> application(having 200-300 TB's of data) we have to scale the kafka brokers 
> as well. We want to reduce this dependency and find out ways to just use 
> changelog topic as a queue, having just 2 or 3 days of data and warm up the 
> replicas from scratch in some other way.
> I have few proposals in that respect.
> 1. Use a new kafka topic related to each partition which we need to warm up 
> on