[jira] [Resolved] (KAFKA-6849) Add transformValues() method to KTable

2018-05-18 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6849.
--
   Resolution: Fixed
Fix Version/s: 2.0.0

>  Add transformValues() method to KTable
> ---
>
> Key: KAFKA-6849
> URL: https://issues.apache.org/jira/browse/KAFKA-6849
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
>  Labels: kip
> Fix For: 2.0.0
>
>
> Add {{transformValues()}} methods to the {{KTable}} interface with the same 
> semantics as the functions of the same name on the {{KStream}} interface.
>  
> More details in 
> [KIP-292|https://cwiki.apache.org/confluence/display/KAFKA/KIP-292%3A+Add+transformValues%28%29+method+to+KTable].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6849) Add transformValues() method to KTable

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481322#comment-16481322
 ] 

ASF GitHub Bot commented on KAFKA-6849:
---

guozhangwang closed pull request #4959: KAFKA-6849: add transformValues methods 
to KTable.
URL: https://github.com/apache/kafka/pull/4959
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index 38955224278..4c76c5e84b6 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -2906,6 +2906,7 @@ OverviewTransform (values only)
 
 KStream - KStream
+KTable - KTable
 
 
 Applies a ValueTransformer to each record, while 
retaining the key of the original record.
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 7788638c195..0e3725ebe32 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -192,6 +192,13 @@ Streams API
 to distinguish them from configurations of other clients that share 
the same config names.
 
 
+
+New method in KTable
+
+
+ transformValues methods have been added to 
KTable. Similar to those on KStream, these methods 
allow for richer, stateful, value transformation similar to the Processor 
API.
+
+
 
New method in GlobalKTable
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 1d42f11c0e5..e75bb3aa227 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -544,12 +544,12 @@ void to(final String topic,
 
 /**
  * Transform the value of each input record into a new value (with 
possible new type) of the output record.
- * A {@link ValueTransformer} (provided by the given {@link 
ValueTransformerSupplier}) is applies to each input
+ * A {@link ValueTransformer} (provided by the given {@link 
ValueTransformerSupplier}) is applied to each input
  * record value and computes a new value for it.
  * Thus, an input record {@code } can be transformed into an output 
record {@code }.
  * This is a stateful record-by-record operation (cf. {@link 
#mapValues(ValueMapper)}).
  * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress can be observed and additional
- * periodic actions get be performed.
+ * periodic actions can be performed.
  * 
  * In order to assign a state, the state must be created and registered 
beforehand:
  * {@code
@@ -613,12 +613,12 @@ void to(final String topic,
 
 /**
  * Transform the value of each input record into a new value (with 
possible new type) of the output record.
- * A {@link ValueTransformerWithKey} (provided by the given {@link 
ValueTransformerWithKeySupplier}) is applies to each input
+ * A {@link ValueTransformerWithKey} (provided by the given {@link 
ValueTransformerWithKeySupplier}) is applied to each input
  * record value and computes a new value for it.
  * Thus, an input record {@code } can be transformed into an output 
record {@code }.
  * This is a stateful record-by-record operation (cf. {@link 
#mapValues(ValueMapperWithKey)}).
  * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress can be observed and additional
- * periodic actions get be performed.
+ * periodic actions can be performed.
  * 
  * In order to assign a state, the state must be created and registered 
beforehand:
  * {@code
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index dbead79c1f6..da540ba627f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -23,6 +23,7 @@
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
@@ -382,6 +383,157 @@
  */

[jira] [Commented] (KAFKA-6729) KTable should use user source topics if possible and not create changelog topic

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481305#comment-16481305
 ] 

ASF GitHub Bot commented on KAFKA-6729:
---

guozhangwang opened a new pull request #5038: KAFKA-6729: Follow up; disable 
logging for source KTable.
URL: https://github.com/apache/kafka/pull/5038
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KTable should use user source topics if possible and not create changelog 
> topic
> ---
>
> Key: KAFKA-6729
> URL: https://issues.apache.org/jira/browse/KAFKA-6729
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.0.0
>
>
> With KIP-182 we reworked Streams API largely and introduced a regression into 
> 1.0 code base. If a KTable is populated from a source topic, ie, 
> StreamsBuilder.table() -- the KTable does create its own changelog topic. 
> However, in older releases (0.11 or older), we don't create a changelog topic 
> but use the user specified source topic instead.
> We want to reintroduce this optimization to reduce the load (storage and 
> write) on the broker side for this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6917) Request handler deadlocks attempting to acquire group metadata lock

2018-05-18 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-6917:
---
Fix Version/s: 0.11.0.3

> Request handler deadlocks attempting to acquire group metadata lock
> ---
>
> Key: KAFKA-6917
> URL: https://issues.apache.org/jira/browse/KAFKA-6917
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> We have noticed another deadlock with the group metadata lock with version 
> 1.1.
> {quote}
> Found one Java-level deadlock:
> =
> "executor-Heartbeat":
>   waiting for ownable synchronizer 0x0005ce477080, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-3"
> "kafka-request-handler-3":
>   waiting for ownable synchronizer 0x0005cbe7f698, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-4"
> "kafka-request-handler-4":
>   waiting for ownable synchronizer 0x0005ce477080, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-3"
> Java stack information for the threads listed above:
> ===
> "executor-Heartbeat":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0005ce477080> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:833)
> at 
> kafka.coordinator.group.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:34)
> at kafka.server.DelayedOperation.run(DelayedOperation.scala:144)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-3":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0005cbe7f698> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:801)
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:799)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> at 
> kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:799)
> at 
> kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:496)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1633)
> at 
> 

[jira] [Resolved] (KAFKA-6917) Request handler deadlocks attempting to acquire group metadata lock

2018-05-18 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6917.

Resolution: Fixed

> Request handler deadlocks attempting to acquire group metadata lock
> ---
>
> Key: KAFKA-6917
> URL: https://issues.apache.org/jira/browse/KAFKA-6917
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.0.0, 1.0.2, 1.1.1
>
>
> We have noticed another deadlock with the group metadata lock with version 
> 1.1.
> {quote}
> Found one Java-level deadlock:
> =
> "executor-Heartbeat":
>   waiting for ownable synchronizer 0x0005ce477080, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-3"
> "kafka-request-handler-3":
>   waiting for ownable synchronizer 0x0005cbe7f698, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-4"
> "kafka-request-handler-4":
>   waiting for ownable synchronizer 0x0005ce477080, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-3"
> Java stack information for the threads listed above:
> ===
> "executor-Heartbeat":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0005ce477080> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:833)
> at 
> kafka.coordinator.group.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:34)
> at kafka.server.DelayedOperation.run(DelayedOperation.scala:144)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-3":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0005cbe7f698> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:801)
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:799)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> at 
> kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:799)
> at 
> kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:496)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1633)
> at 
> kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691)
>   

[jira] [Commented] (KAFKA-6917) Request handler deadlocks attempting to acquire group metadata lock

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16481159#comment-16481159
 ] 

ASF GitHub Bot commented on KAFKA-6917:
---

hachikuji closed pull request #5036: KAFKA-6917: Process txn completion 
asynchronously to avoid deadlock
URL: https://github.com/apache/kafka/pull/5036
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index cbbd91396b2..9748e174c78 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -81,8 +81,7 @@ class GroupCoordinator(val brokerId: Int,
*/
   def startup(enableMetadataExpiration: Boolean = true) {
 info("Starting up.")
-if (enableMetadataExpiration)
-  groupManager.enableMetadataExpiration()
+groupManager.startup(enableMetadataExpiration)
 isActive.set(true)
 info("Startup complete.")
   }
@@ -485,12 +484,12 @@ class GroupCoordinator(val brokerId: Int,
 }
   }
 
-  def handleTxnCompletion(producerId: Long,
-  offsetsPartitions: Iterable[TopicPartition],
-  transactionResult: TransactionResult) {
+  def scheduleHandleTxnCompletion(producerId: Long,
+  offsetsPartitions: Iterable[TopicPartition],
+  transactionResult: TransactionResult) {
 require(offsetsPartitions.forall(_.topic == 
Topic.GROUP_METADATA_TOPIC_NAME))
 val isCommit = transactionResult == TransactionResult.COMMIT
-groupManager.handleTxnCompletion(producerId, 
offsetsPartitions.map(_.partition).toSet, isCommit)
+groupManager.scheduleHandleTxnCompletion(producerId, 
offsetsPartitions.map(_.partition).toSet, isCommit)
   }
 
   private def doCommitOffsets(group: GroupMetadata,
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 81ce8d512e5..c31735b75c8 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -135,13 +135,14 @@ class GroupMetadataManager(brokerId: Int,
   })
 })
 
-  def enableMetadataExpiration() {
+  def startup(enableMetadataExpiration: Boolean) {
 scheduler.startup()
-
-scheduler.schedule(name = "delete-expired-group-metadata",
-  fun = cleanupGroupMetadata,
-  period = config.offsetsRetentionCheckIntervalMs,
-  unit = TimeUnit.MILLISECONDS)
+if (enableMetadataExpiration) {
+  scheduler.schedule(name = "delete-expired-group-metadata",
+fun = cleanupGroupMetadata,
+period = config.offsetsRetentionCheckIntervalMs,
+unit = TimeUnit.MILLISECONDS)
+}
   }
 
   def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values
@@ -793,7 +794,20 @@ class GroupMetadataManager(brokerId: Int,
 offsetsRemoved
   }
 
-  def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], 
isCommit: Boolean) {
+  /**
+   * Complete pending transactional offset commits of the groups of 
`producerId` from the provided
+   * `completedPartitions`. This method is invoked when a commit or abort 
marker is fully written
+   * to the log. It may be invoked when a group lock is held by the caller, 
for instance when delayed
+   * operations are completed while appending offsets for a group. Since we 
need to acquire one or
+   * more group metadata locks to handle transaction completion, this 
operation is scheduled on
+   * the scheduler thread to avoid deadlocks.
+   */
+  def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: 
Set[Int], isCommit: Boolean): Unit = {
+scheduler.schedule(s"handleTxnCompletion-$producerId", () =>
+  handleTxnCompletion(producerId, completedPartitions, isCommit))
+  }
+
+  private[group] def handleTxnCompletion(producerId: Long, 
completedPartitions: Set[Int], isCommit: Boolean): Unit = {
 val pendingGroups = groupsBelongingToPartitions(producerId, 
completedPartitions)
 pendingGroups.foreach { case (groupId) =>
   getGroup(groupId) match {
@@ -802,7 +816,7 @@ class GroupMetadataManager(brokerId: Int,
 group.completePendingTxnOffsetCommit(producerId, isCommit)
 removeProducerGroup(producerId, groupId)
   }
-   }
+}
 case _ =>
   info(s"Group $groupId has moved away from $brokerId after 
transaction marker was written but before the " +
 s"cache was 

[jira] [Updated] (KAFKA-6914) Kafka Connect - Plugins class should have a constructor that can take in parent ClassLoader

2018-05-18 Thread Sriram KS (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriram KS updated KAFKA-6914:
-
Fix Version/s: 1.1.1

> Kafka Connect - Plugins class should have a constructor that can take in 
> parent ClassLoader
> ---
>
> Key: KAFKA-6914
> URL: https://issues.apache.org/jira/browse/KAFKA-6914
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sriram KS
>Priority: Major
> Fix For: 1.1.1
>
>
> Currently Plugins class has a single constructor that takes in map of props.
> Please make Plugin class to have a constructor that takes in a classLoader as 
> well and use it to set DelegationClassLoader's parent classLoader.
> Reason:
> This will be useful if i am already having a managed class Loader environment 
> like a Spring boot app which resolves my class dependencies using my 
> maven/gradle dependency management.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-18 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480983#comment-16480983
 ] 

Dong Lin edited comment on KAFKA-6188 at 5/18/18 5:59 PM:
--

Kafka currently handles disk failure on a per-disk basis, i.e. if there is 
IOException when accessing a given partition on a given disk, the entire disk 
is marked as offline which in turn marks all partitions in that disk as offline.

Thus, if all disks are marked as offline, all partitions in the broker will be 
offline, there is nothing for broker to do and the broker will shutdown itself.

The question here is why there is IOException in the first place. Does it 
indicate legitimate issue with the disk, i.e. disk is problematic and should be 
replaced, or does it indicate a bug in Kafka (e.g. broker tries to access a 
file after the file has been deleted).

 

 


was (Author: lindong):
Kafka currently handles disk failure on a per-disk basis, i.e. if there is 
IOException when accessing a given partition on a given disk, the entire disk 
is marked as offline which in turn marks all partitions in that disk as offline.

Thus, if all disks are marked as offline, all partitions in the broker will be 
offline, there is nothing for broker to do and the broker will shutdown itself.

The question here is why there is IOException in the first place. Does it 
indicate legitimate issue with the disk, i.e. disk is problematic and should be 
replaced, or does it indicate a bug in Kafka (e.g. broker try to access a file 
after the fail has been deleted).

 

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-18 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480983#comment-16480983
 ] 

Dong Lin commented on KAFKA-6188:
-

Kafka currently handles disk failure on a per-disk basis, i.e. if there is 
IOException when accessing a given partition on a given disk, the entire disk 
is marked as offline which in turn marks all partitions in that disk as offline.

Thus, if all disks are marked as offline, all partitions in the broker will be 
offline, there is nothing for broker to do and the broker will shutdown itself.

The question here is why there is IOException in the first place. Does it 
indicate legitimate issue with the disk, i.e. disk is problematic and should be 
replaced, or does it indicate a bug in Kafka (e.g. broker try to access a file 
after the fail has been deleted).

 

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-18 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6566:
-
Fix Version/s: 0.10.2.2

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Assignee: Robert Yokota
>Priority: Blocker
> Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-18 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-6566.
--
Resolution: Fixed

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Assignee: Robert Yokota
>Priority: Blocker
> Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-18 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6566:
-
Reviewer: Ewen Cheslack-Postava

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Assignee: Robert Yokota
>Priority: Blocker
> Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-18 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6566:
-
Fix Version/s: 1.1.1
   1.0.2
   0.11.0.3

> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Gunnar Morling
>Assignee: Robert Yokota
>Priority: Blocker
> Fix For: 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public Class taskClass() {
> return TestTask.class;
> }
> @Override
> public List> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map props) {
> }
> @Override
> public List poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6566) SourceTask#stop() not called after exception raised in poll()

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480970#comment-16480970
 ] 

ASF GitHub Bot commented on KAFKA-6566:
---

ewencp closed pull request #5020: KAFKA-6566: Improve Connect Resource Cleanup
URL: https://github.com/apache/kafka/pull/5020
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 2ba785c4668..6edcfd41886 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -148,10 +148,23 @@ public void stop() {
 protected void close() {
 // FIXME Kafka needs to add a timeout parameter here for us to 
properly obey the timeout
 // passed in
-task.stop();
-if (consumer != null)
-consumer.close();
-transformationChain.close();
+try {
+task.stop();
+} catch (Throwable t) {
+log.warn("Could not stop task", t);
+}
+if (consumer != null) {
+try {
+consumer.close();
+} catch (Throwable t) {
+log.warn("Could not close consumer", t);
+}
+}
+try {
+transformationChain.close();
+} catch (Throwable t) {
+log.warn("Could not close transformation chain", t);
+}
 }
 
 @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index f2cef5a63f7..f17475dacfc 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -87,6 +87,7 @@
 private Map taskConfig;
 private boolean finishedStart = false;
 private boolean startedShutdownBeforeStartCompleted = false;
+private boolean stopped = false;
 
 public WorkerSourceTask(ConnectorTaskId id,
 SourceTask task,
@@ -137,8 +138,21 @@ public void initialize(TaskConfig taskConfig) {
 
 @Override
 protected void close() {
-producer.close(30, TimeUnit.SECONDS);
-transformationChain.close();
+if (!shouldPause()) {
+tryStop();
+}
+if (producer != null) {
+try {
+producer.close(30, TimeUnit.SECONDS);
+} catch (Throwable t) {
+log.warn("Could not close producer", t);
+}
+}
+try {
+transformationChain.close();
+} catch (Throwable t) {
+log.warn("Could not close transformation chain", t);
+}
 }
 
 @Override
@@ -152,12 +166,23 @@ public void stop() {
 stopRequestedLatch.countDown();
 synchronized (this) {
 if (finishedStart)
-task.stop();
+tryStop();
 else
 startedShutdownBeforeStartCompleted = true;
 }
 }
 
+private synchronized void tryStop() {
+if (!stopped) {
+try {
+task.stop();
+stopped = true;
+} catch (Throwable t) {
+log.warn("Could not stop task", t);
+}
+}
+}
+
 @Override
 public void execute() {
 try {
@@ -166,7 +191,7 @@ public void execute() {
 log.info("{} Source task finished initialization and start", this);
 synchronized (this) {
 if (startedShutdownBeforeStartCompleted) {
-task.stop();
+tryStop();
 return;
 }
 finishedStart = true;


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SourceTask#stop() not called after exception raised in poll()
> -
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>   

[jira] [Commented] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop

2018-05-18 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480849#comment-16480849
 ] 

Matthias J. Sax commented on KAFKA-5697:


[~guozhang] [~vvcephei]: should we reopen this issue?

> StreamThread.shutdown() need to interrupt the stream threads to break the loop
> --
>
> Key: KAFKA-5697
> URL: https://issues.apache.org/jira/browse/KAFKA-5697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: John Roesler
>Priority: Major
>  Labels: newbie
> Fix For: 2.0.0
>
>
> In {{StreamThread.shutdown()}} we currently do nothing but set the state, 
> hoping the stream thread may eventually check it and shutdown itself. 
> However, under certain scenarios the thread may get blocked within a single 
> loop and hence will never check on this state enum. For example, it's 
> {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block 
> until the coordinator can be found. If the coordinator broker is never up and 
> running then the Stream instance will be blocked forever.
> A simple way to produce this issue is to start the work count demo without 
> starting the ZK / Kafka broker, and then it will get stuck in a single loop 
> and even `ctrl-C` will not stop it since its set state will never be read by 
> the thread:
> {code:java}
> [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5728) Stopping consumer thread cause loosing message in the partition

2018-05-18 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5728.
--
Resolution: Auto Closed

looks like this is related to spring kafka config issue. must be related to 
committing offsets. Pls take a look at spring kafka docs.

> Stopping consumer thread cause loosing message in the partition
> ---
>
> Key: KAFKA-5728
> URL: https://issues.apache.org/jira/browse/KAFKA-5728
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: Vasudevan Karnan
>Priority: Major
>
> Currently using Spring boot Kafka listener thread to consume the message from 
> partition.
> Having 10 partitions and concurrency to 10 in the consumer group.
> In testing, I have 2 messages in the single partition (say for ex: partition 
> 4). Created listener to read the message and post to service. During normal 
> days, read the message and post to service, and working as expected. No 
> issues on that.
> Suppose if the service is down, then I am doing Spring Retry template to 
> retry to post the message to service (repeatedly) for number of retry and 
> backoff time in ms. If I stop the listener, then getting 
> org.springframework.retry.backoff.BackOffInterruptedException: Thread 
> interrupted while sleeping; nested exception is 
> java.lang.InterruptedException: sleep interrupted
>   at 
> org.springframework.retry.backoff.FixedBackOffPolicy.doBackOff(FixedBackOffPolicy.java:86)
>  ~[spring-retry-1.1.4.RELEASE.jar:na]
>   at 
> org.springframework.retry.backoff.StatelessBackOffPolicy.backOff(StatelessBackOffPolicy.java:36)
>  ~[spring-retry-1.1.4.RELEASE.jar:na]
> After that I am loosing the message from particular partition (message that 
> are got retried is lost in the middle) and lag is reduced. (This is happening 
> during the end of stopping the listener).
> Is there any way, we don't loose the message even I am getting the sleep 
> interrupted exception?
> Suppose during stopping the server, if I dont face sleep interrupt exception, 
> in the next time listener startup, face the same issue and loosing the 
> message again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-18 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480809#comment-16480809
 ] 

Ted Yu commented on KAFKA-6188:
---

The above code came from:

KAFKA-4763; Handle disk failure for JBOD (KIP-112)

[~lindong]:
You may have answer to Manna's first question.

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6918) Kafka server fails to start with IBM JAVA

2018-05-18 Thread Nayana Thorat (JIRA)
Nayana Thorat created KAFKA-6918:


 Summary: Kafka server fails to start with IBM JAVA
 Key: KAFKA-6918
 URL: https://issues.apache.org/jira/browse/KAFKA-6918
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Nayana Thorat


Kafka server start fails with below error:

bin/kafka-server-start.sh -daemon config/server.properties

ERROR:

(kafka.server.KafkaConfig)

 FATAL  (kafka.Kafka$)

java.lang.IllegalArgumentException: Signal already used by VM: INT

    at 
com.ibm.misc.SignalDispatcher.registerSignal(SignalDispatcher.java:127)

    at sun.misc.Signal.handle(Signal.java:184)

    at kafka.Kafka$.registerHandler$1(Kafka.scala:67)

    at kafka.Kafka$.registerLoggingSignalHandler(Kafka.scala:74)

    at kafka.Kafka$.main(Kafka.scala:85)

    at kafka.Kafka.main(Kafka.scala)

 

Tried with binaries and well as built  Apache Kafka(v1.0.0) from source.

Installed  IBM SDK on Ubuntu 16.04. 

IBM java link:

wget 
http://public.dhe.ibm.com/ibmdl/export/pub/systems/cloud/runtimes/java/8.0.5.10/linux/x86_64/ibm-java-sdk-8.0-5.10-x86_64-archive.bin

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480778#comment-16480778
 ] 

ASF GitHub Bot commented on KAFKA-5697:
---

guozhangwang closed pull request #5035: KAFKA-5697: Revert streams wakeup
URL: https://github.com/apache/kafka/pull/5035
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java 
b/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java
deleted file mode 100644
index d404642793c..000
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ShutdownException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.errors;
-
-public class ShutdownException extends StreamsException {
-public ShutdownException(final String message) {
-super(message);
-}
-
-public ShutdownException(final String message, final Throwable throwable) {
-super(message, throwable);
-}
-
-public ShutdownException(final Throwable throwable) {
-super(throwable);
-}
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
deleted file mode 100644
index 8b912579b9a..000
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ConsumerUtils.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-
-import java.util.Collections;
-import java.util.List;
-
-public final class ConsumerUtils {
-private ConsumerUtils() {}
-
-public static  ConsumerRecords poll(final Consumer 
consumer, final long maxDurationMs) {
-try {
-return consumer.poll(maxDurationMs);
-} catch (final WakeupException e) {
-return new ConsumerRecords<>(Collections.>>emptyMap());
-}
-}
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 017f2da198f..e8ec5e9fe5f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -23,14 +23,12 @@
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import 

[jira] [Commented] (KAFKA-6873) Broker is not returning data including requested offset

2018-05-18 Thread Adam Dratwinski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480710#comment-16480710
 ] 

Adam Dratwinski commented on KAFKA-6873:


Message format version was 1.1.0 and no compression.

> Broker is not returning data including requested offset
> ---
>
> Key: KAFKA-6873
> URL: https://issues.apache.org/jira/browse/KAFKA-6873
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.0
> Environment: ubuntu
>Reporter: Adam Dratwinski
>Priority: Blocker
>
> After upgrading Kafka to 1.1.0 from 0.9.x I experience issues related with 
> broker returning incomplete responses. This happens for my all log compacted 
> topics. I am using Golang client (Sarama).
> I debugged the issue and found that for some requests brokers return 
> FetchResponse with all messages having offsets lower then requested. For 
> example, I request for offset 1078831, I get FetchResponse with only one 
> message having offset 1078830, which produces missing blocks error. If I 
> request the next offset (1078832), then I get a block with many messages, 
> starting with much higher offset (e.g 1083813). There is a big gap in offsets 
> between these records, probably because I am using log compacted topics, but 
> all expected messages are there.
> Sarama client treats this as consumer error:
> {quote}kafka: response did not contain all the expected topic/partition blocks
> {quote}
> For build-in java client this issue is not happening. Looks like it is less 
> restrict regarding the data order, and when the offset is missing in the 
> returned block, it just simply request the next offset.
> I reported this issue at Shopify/sarama Github project (see 
> [https://github.com/Shopify/sarama/issues/1087)], where I got response, that 
> this seems to be Kafka bug, as according to the documentation, in this 
> situation broker should never return only messages having lower offsets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6917) Request handler deadlocks attempting to acquire group metadata lock

2018-05-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480706#comment-16480706
 ] 

ASF GitHub Bot commented on KAFKA-6917:
---

rajinisivaram opened a new pull request #5036: KAFKA-6917: Process txn 
completion asynchronously to avoid deadlock
URL: https://github.com/apache/kafka/pull/5036
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Request handler deadlocks attempting to acquire group metadata lock
> ---
>
> Key: KAFKA-6917
> URL: https://issues.apache.org/jira/browse/KAFKA-6917
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.0.0, 1.0.2, 1.1.1
>
>
> We have noticed another deadlock with the group metadata lock with version 
> 1.1.
> {quote}
> Found one Java-level deadlock:
> =
> "executor-Heartbeat":
>   waiting for ownable synchronizer 0x0005ce477080, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-3"
> "kafka-request-handler-3":
>   waiting for ownable synchronizer 0x0005cbe7f698, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-4"
> "kafka-request-handler-4":
>   waiting for ownable synchronizer 0x0005ce477080, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "kafka-request-handler-3"
> Java stack information for the threads listed above:
> ===
> "executor-Heartbeat":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0005ce477080> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> kafka.coordinator.group.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:833)
> at 
> kafka.coordinator.group.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:34)
> at kafka.server.DelayedOperation.run(DelayedOperation.scala:144)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> "kafka-request-handler-3":
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0005cbe7f698> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
> at 
> kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
> at 
> 

[jira] [Commented] (KAFKA-6873) Broker is not returning data including requested offset

2018-05-18 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480703#comment-16480703
 ] 

Ismael Juma commented on KAFKA-6873:


Thanks for the report. What is the message format version and was compression 
being used?

> Broker is not returning data including requested offset
> ---
>
> Key: KAFKA-6873
> URL: https://issues.apache.org/jira/browse/KAFKA-6873
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.0
> Environment: ubuntu
>Reporter: Adam Dratwinski
>Priority: Blocker
>
> After upgrading Kafka to 1.1.0 from 0.9.x I experience issues related with 
> broker returning incomplete responses. This happens for my all log compacted 
> topics. I am using Golang client (Sarama).
> I debugged the issue and found that for some requests brokers return 
> FetchResponse with all messages having offsets lower then requested. For 
> example, I request for offset 1078831, I get FetchResponse with only one 
> message having offset 1078830, which produces missing blocks error. If I 
> request the next offset (1078832), then I get a block with many messages, 
> starting with much higher offset (e.g 1083813). There is a big gap in offsets 
> between these records, probably because I am using log compacted topics, but 
> all expected messages are there.
> Sarama client treats this as consumer error:
> {quote}kafka: response did not contain all the expected topic/partition blocks
> {quote}
> For build-in java client this issue is not happening. Looks like it is less 
> restrict regarding the data order, and when the offset is missing in the 
> returned block, it just simply request the next offset.
> I reported this issue at Shopify/sarama Github project (see 
> [https://github.com/Shopify/sarama/issues/1087)], where I got response, that 
> this seems to be Kafka bug, as according to the documentation, in this 
> situation broker should never return only messages having lower offsets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-18 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480686#comment-16480686
 ] 

M. Manna edited comment on KAFKA-6188 at 5/18/18 2:01 PM:
--

[~yuzhih...@gmail.com] I am constantly hitting blocker at this location. There 
are two FATAL shutdowns and this one is causing issues during startup 
(LogManager):

 

{{ // dir should be an absolute path}}
 {{  def handleLogDirFailure(dir: String) {}}
 {{    info(s"Stopping serving logs in dir $dir")}}
 {{    logCreationOrDeletionLock synchronized {}}
 {{  _liveLogDirs.remove(new File(dir))}}
 {{  if (_liveLogDirs.isEmpty) {}}
 {{    fatal(s"Shutdown broker because all log dirs in 
${logDirs.mkString(", ")} have failed")}}
 {{    Exit.halt(1)}}
 {

{  }

}}

Since the queue is empty (i.e. no valid/clean log directoy exists) it wants to 
fatally shutdown. If we stop the shutdown here, does it mean that it will keep 
failing gracefully and log segments/offsets will keep growing and violating the 
scheduled retention/cleanup policy ?


 Additionally, Do you believe that a KIP is needed to address a "blocking" log 
cleanup logic such that

1) Old files are closed, renamed and copied so that new segments can be 
"opened" for writing.

2) Renaming occurs for all old segments - since they are closed, we can safely 
delete remove them

3) All the above should use some locking/unlocking where applicable.

 

 

 


was (Author: manme...@gmail.com):
[~yuzhih...@gmail.com] I am constantly hitting blocker at this location. There 
are two FATAL shutdowns and this one is causing issues during startup 
(LogManager):

 

{{ // dir should be an absolute path}}
 {{  def handleLogDirFailure(dir: String) {}}
 {{    info(s"Stopping serving logs in dir $dir")}}
 {{    logCreationOrDeletionLock synchronized {}}
 {{  _liveLogDirs.remove(new File(dir))}}
 {{  if (_liveLogDirs.isEmpty) {}}
 {{    fatal(s"Shutdown broker because all log dirs in 
${logDirs.mkString(", ")} have failed")}}
 {{    Exit.halt(1)}}
 \{{  }}}

Since the queue is empty (i.e. no valid/clean log directoy exists) it wants to 
fatally shutdown. If we stop the shutdown here, does it mean that it will keep 
failing gracefully and messages will keep growing ? 
 Additionally, Do you believe that a KIP is needed to address a "blocking" log 
cleanup logic such that

1) Old files are closed, renamed and copied so that new segments can be 
"opened" for writing.

2) Renaming occurs for all old segments - since they are closed, we can safely 
delete remove them

3) All the above should use some locking/unlocking where applicable.

 

 

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  

[jira] [Comment Edited] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-18 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480686#comment-16480686
 ] 

M. Manna edited comment on KAFKA-6188 at 5/18/18 2:00 PM:
--

[~yuzhih...@gmail.com] I am constantly hitting blocker at this location. There 
are two FATAL shutdowns and this one is causing issues during startup 
(LogManager):

 

{{ // dir should be an absolute path}}
 {{  def handleLogDirFailure(dir: String) {}}
 {{    info(s"Stopping serving logs in dir $dir")}}
 {{    logCreationOrDeletionLock synchronized {}}
 {{  _liveLogDirs.remove(new File(dir))}}
 {{  if (_liveLogDirs.isEmpty) {}}
 {{    fatal(s"Shutdown broker because all log dirs in 
${logDirs.mkString(", ")} have failed")}}
 {{    Exit.halt(1)}}
 \{{  }}}

Since the queue is empty (i.e. no valid/clean log directoy exists) it wants to 
fatally shutdown. If we stop the shutdown here, does it mean that it will keep 
failing gracefully and messages will keep growing ? 
 Additionally, Do you believe that a KIP is needed to address a "blocking" log 
cleanup logic such that

1) Old files are closed, renamed and copied so that new segments can be 
"opened" for writing.

2) Renaming occurs for all old segments - since they are closed, we can safely 
delete remove them

3) All the above should use some locking/unlocking where applicable.

 

 

 


was (Author: manme...@gmail.com):
[~yuzhih...@gmail.com] I am constantly having issues here. There are 
potentially two shutdowns and this one seems to be causing issues for FATAL 
shutdown (LogManager):

 

{{ // dir should be an absolute path}}
{{  def handleLogDirFailure(dir: String) {}}
{{    info(s"Stopping serving logs in dir $dir")}}
{{    logCreationOrDeletionLock synchronized {}}
{{  _liveLogDirs.remove(new File(dir))}}
{{  if (_liveLogDirs.isEmpty) {}}
{{    fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", 
")} have failed")}}
{{    Exit.halt(1)}}
{{  }}}

Since the queue is empty (i.e. no valid/clean log directoy exists) it wants to 
fatally shutdown. If we stop the shutdown here, does it mean that it will keep 
failing gracefully and messages will keep growing ? 
Additionally, Do you believe that a KIP is needed to address a "blocking" log 
cleanup logic such that

1) Old files are closed, renamed and copied so that new segments can be 
"opened" for writing.

2) Renaming occurs for all old segments - since they are closed, we can safely 
delete remove them

3) All the above should use some locking/unlocking where applicable.

 

 

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory 

[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-05-18 Thread M. Manna (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480686#comment-16480686
 ] 

M. Manna commented on KAFKA-6188:
-

[~yuzhih...@gmail.com] I am constantly having issues here. There are 
potentially two shutdowns and this one seems to be causing issues for FATAL 
shutdown (LogManager):

 

{{ // dir should be an absolute path}}
{{  def handleLogDirFailure(dir: String) {}}
{{    info(s"Stopping serving logs in dir $dir")}}
{{    logCreationOrDeletionLock synchronized {}}
{{  _liveLogDirs.remove(new File(dir))}}
{{  if (_liveLogDirs.isEmpty) {}}
{{    fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", 
")} have failed")}}
{{    Exit.halt(1)}}
{{  }}}

Since the queue is empty (i.e. no valid/clean log directoy exists) it wants to 
fatally shutdown. If we stop the shutdown here, does it mean that it will keep 
failing gracefully and messages will keep growing ? 
Additionally, Do you believe that a KIP is needed to address a "blocking" log 
cleanup logic such that

1) Old files are closed, renamed and copied so that new segments can be 
"opened" for writing.

2) Renaming occurs for all old segments - since they are closed, we can safely 
delete remove them

3) All the above should use some locking/unlocking where applicable.

 

 

 

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6199) Single broker with fast growing heap usage

2018-05-18 Thread Robin Tweedie (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480626#comment-16480626
 ] 

Robin Tweedie commented on KAFKA-6199:
--

Reading the 1.1 release notes, I suspect the fix was from KAFKA-6529 :D

> Single broker with fast growing heap usage
> --
>
> Key: KAFKA-6199
> URL: https://issues.apache.org/jira/browse/KAFKA-6199
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1
> Environment: Amazon Linux
>Reporter: Robin Tweedie
>Priority: Major
> Fix For: 1.1.0
>
> Attachments: Screen Shot 2017-11-10 at 1.55.33 PM.png, Screen Shot 
> 2017-11-10 at 11.59.06 AM.png, dominator_tree.png, histo_live.txt, 
> histo_live_20171206.txt, histo_live_80.txt, jstack-2017-12-08.scrubbed.out, 
> merge_shortest_paths.png, path2gc.png
>
>
> We have a single broker in our cluster of 25 with fast growing heap usage 
> which necessitates us restarting it every 12 hours. If we don't restart the 
> broker, it becomes very slow from long GC pauses and eventually has 
> {{OutOfMemory}} errors.
> See {{Screen Shot 2017-11-10 at 11.59.06 AM.png}} for a graph of heap usage 
> percentage on the broker. A "normal" broker in the same cluster stays below 
> 50% (averaged) over the same time period.
> We have taken heap dumps when the broker's heap usage is getting dangerously 
> high, and there are a lot of retained {{NetworkSend}} objects referencing 
> byte buffers.
> We also noticed that the single affected broker logs a lot more of this kind 
> of warning than any other broker:
> {noformat}
> WARN Attempting to send response via channel for which there is no open 
> connection, connection id 13 (kafka.network.Processor)
> {noformat}
> See {{Screen Shot 2017-11-10 at 1.55.33 PM.png}} for counts of that WARN log 
> message visualized across all the brokers (to show it happens a bit on other 
> brokers, but not nearly as much as it does on the "bad" broker).
> I can't make the heap dumps public, but would appreciate advice on how to pin 
> down the problem better. We're currently trying to narrow it down to a 
> particular client, but without much success so far.
> Let me know what else I could investigate or share to track down the source 
> of this leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6917) Request handler deadlocks attempting to acquire group metadata lock

2018-05-18 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-6917:
-

 Summary: Request handler deadlocks attempting to acquire group 
metadata lock
 Key: KAFKA-6917
 URL: https://issues.apache.org/jira/browse/KAFKA-6917
 Project: Kafka
  Issue Type: Task
  Components: core
Affects Versions: 1.0.1, 1.1.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.0.0, 1.0.2, 1.1.1


We have noticed another deadlock with the group metadata lock with version 1.1.

{quote}
Found one Java-level deadlock:
=
"executor-Heartbeat":
  waiting for ownable synchronizer 0x0005ce477080, (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "kafka-request-handler-3"
"kafka-request-handler-3":
  waiting for ownable synchronizer 0x0005cbe7f698, (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "kafka-request-handler-4"
"kafka-request-handler-4":
  waiting for ownable synchronizer 0x0005ce477080, (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "kafka-request-handler-3"

Java stack information for the threads listed above:
===
"executor-Heartbeat":
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0005ce477080> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
at 
kafka.coordinator.group.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:833)
at 
kafka.coordinator.group.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:34)
at kafka.server.DelayedOperation.run(DelayedOperation.scala:144)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

"kafka-request-handler-3":
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0005cbe7f698> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248)
at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188)
at 
kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:801)
at 
kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:799)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at 
kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:799)
at 
kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:496)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1633)
at 
kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691)
at 
kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691)
at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
at 
kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111)
at 

[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-05-18 Thread Christoph Schmidt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480453#comment-16480453
 ] 

Christoph Schmidt commented on KAFKA-1194:
--

(!) The only permanently functioning workaround currently known appears to be 
to completely disable the Cleaner:

 
{quote} {{log.cleaner.enable = false}}
{quote}

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4091) Unable to produce or consume on any topic

2018-05-18 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4091.
--
Resolution: Cannot Reproduce

{color:#00}Closing inactive issue. Please reopen if you think the issue 
still exists.{color}
 

> Unable to produce or consume on any topic
> -
>
> Key: KAFKA-4091
> URL: https://issues.apache.org/jira/browse/KAFKA-4091
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
> Environment: Amazon Linux, t2.micro
>Reporter: Avi Chopra
>Priority: Critical
>
> While trying to set kafka on 2 slave and 1 master box, got a weird condition 
> where I was not able to consume or produce to a topic.
> Using Mirror Maker to sync data between slave <--> Master. Getting following 
> logs unending :
> [2016-08-26 14:28:33,897] WARN Bootstrap broker localhost:9092 disconnected 
> (org.apache.kafka.clients.NetworkClient) [2016-08-26 14:28:43,515] WARN 
> Bootstrap broker localhost:9092 disconnected 
> (org.apache.kafka.clients.NetworkClient) [2016-08-26 14:28:45,118] WARN 
> Bootstrap broker localhost:9092 disconnected 
> (org.apache.kafka.clients.NetworkClient) [2016-08-26 14:28:46,721] WARN 
> Bootstrap broker localhost:9092 disconnected 
> (org.apache.kafka.clients.NetworkClient) [2016-08-26 14:28:48,324] WARN 
> Bootstrap broker localhost:9092 disconnected 
> (org.apache.kafka.clients.NetworkClient) [2016-08-26 14:28:49,927] WARN 
> Bootstrap broker localhost:9092 disconnected 
> (org.apache.kafka.clients.NetworkClient) [2016-08-26 14:28:53,029] WARN 
> Bootstrap broker localhost:9092 disconnected 
> (org.apache.kafka.clients.NetworkClient)
> Only way I could recover was by restarting Kafka which produced this kind of 
> logs :
> [2016-08-26 14:30:54,856] WARN Found a corrupted index file, 
> /tmp/kafka-logs/__consumer_offsets-43/.index, deleting 
> and rebuilding index... (kafka.log.Log) [2016-08-26 14:30:54,856] INFO 
> Recovering unflushed segment 0 in log __consumer_offsets-43. (kafka.log.Log) 
> [2016-08-26 14:30:54,857] INFO Completed load of log __consumer_offsets-43 
> with log end offset 0 (kafka.log.Log) [2016-08-26 14:30:54,860] WARN Found a 
> corrupted index file, 
> /tmp/kafka-logs/__consumer_offsets-26/.index, deleting 
> and rebuilding index... (kafka.log.Log) [2016-08-26 14:30:54,860] INFO 
> Recovering unflushed segment 0 in log __consumer_offsets-26. (kafka.log.Log) 
> [2016-08-26 14:30:54,861] INFO Completed load of log __consumer_offsets-26 
> with log end offset 0 (kafka.log.Log) [2016-08-26 14:30:54,864] WARN Found a 
> corrupted index file, 
> /tmp/kafka-logs/__consumer_offsets-35/.index, deleting 
> and rebuilding index... (kafka.log.Log)
> ERROR Error when sending message to topic dr_ubr_analytics_limits with key: 
> null, value: 1 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
> after 6 ms.
> The consumer group command was showing a major lag.
> This is my test phase so I was able to restart and recover from the master 
> box but I want know what caused this issue and how can it be avoided. Is 
> there a way to debug this issue?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6916) AdminClient does not refresh metadata on broker failure

2018-05-18 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-6916:
-

 Summary: AdminClient does not refresh metadata on broker failure
 Key: KAFKA-6916
 URL: https://issues.apache.org/jira/browse/KAFKA-6916
 Project: Kafka
  Issue Type: Task
  Components: admin
Affects Versions: 1.0.1, 1.1.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.0.0


There are intermittent test failures in DynamicBrokerReconfigurationTest when 
brokers are restarted. The test uses ephemeral ports and hence ports after 
server restart are not the same as the ports before restart. The tests rely on 
metadata refresh on producers, consumers and admin clients to obtain new server 
ports when connections fail. This works with producers and consumers, but 
results in intermittent failures with admin client because refresh is not 
triggered.

There are a couple of issues in AdminClient:
 # Unlike producers and consumers, adminClient does not request metadata update 
when connection to a broker fails. This is particularly bad if controller goes 
down. Controller is used for various requests like createTopics and 
describeTopics. If controller goes down and adminClient.describeTopics() is 
invoked, adminClient sends the request to the old controller. If the connection 
fails, it keeps retrying with the same address. Metadata refresh is never 
triggered. The request times out after 2 minutes by default, metadata is not 
refreshed for 5 minutes by default. We should refresh metadata whenever 
connection to a broker fails.
 # Admin client requests are always retried on the same node. In the example 
above, if controller goes down and a new controller is elected, it will be good 
if the retried request is sent to the new controller. Otherwise we are just 
blocking the call for 2 minutes with a lot of retries that would never succeed.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3843) Endless consuming messages in Kafka cluster

2018-05-18 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3843.
--
Resolution: Auto Closed

The Scala consumers have been deprecated and no further work is planned, please 
upgrade to the Java consumer whenever possible.

> Endless consuming messages in Kafka cluster
> ---
>
> Key: KAFKA-3843
> URL: https://issues.apache.org/jira/browse/KAFKA-3843
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.1
>Reporter: Tomas Benc
>Assignee: Neha Narkhede
>Priority: Major
>
> We are running Kafka in cluster (3 virtual machines). Kafka is configured  
> min.insync.replicas = 2 and topics are configured replication factor = 3. 
> This configuration means, there must be at least 2 brokers of 3 in cluster up 
> and running to receive any messages. This works as expected.
> Our consumers are high level consumers and offsets are commited manually 
> (auto.commit disabled) and stored in Kafka.
> Reproducing the issue:
> 1. Kafka cluster up and running and receives messages
> 2. Consumers are disabled (messages in Kafka are in lag)
> 3. Disable 2 Kafka brokers in cluster
> 4. Enable consumers
> Consumers are consuming messages in batch, and commiting offsets after 
> processing. But commit offsets fails in Kafka, because of 
> NotEnoughReplicasException. That is correct. What is not correct, high level 
> consumer has no idea, that offset are not commited and consumes same messages 
> again and again.
> It would be helpful, that method commitOffsets() in interface 
> kafka.javaapi.consumer.ConsumerConnector should return some information 
> (return boolean or throw exception) about this operation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3620) Clean up Protocol class.

2018-05-18 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3620.
--
Resolution: Fixed

Closing this as some cleanup done in newer versions.  Please reopen if you 
think otherwise.

> Clean up Protocol class.
> 
>
> Key: KAFKA-3620
> URL: https://issues.apache.org/jira/browse/KAFKA-3620
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ashish Singh
>Assignee: Ashish Singh
>Priority: Major
>
> This came up on PR of KAFKA-3307. Below is excerpt.
> {quote}
> With the versioning getting a little more complex in Protocol class, it makes 
> sense to try and encapsulate some of its logic a little better. For example, 
> rather than using raw arrays for each request type, we could have something 
> like this:
> {code}
> class KafkaApi {
>   private ApiKey api;
>   private Schema[] requests;
>   private Schema[] responses;
>   Schema currentSchema();
>   Schema schemaFor(int version);
>   int minVersion();
>   int currentVersion();
> }
> {code}
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-3180) issue for extracting JSON from https web page

2018-05-18 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-3180.
--
Resolution: Not A Problem

 I suggest to post these kind of queries to 
[[us...@kafka.apache.org|mailto:us...@kafka.apache.org]|mailto:[us...@kafka.apache.org|mailto:us...@kafka.apache.org]]
 mailing list ([[http://kafka.apache.org/contact]]) for more visibility.

> issue for extracting JSON from https web page
> -
>
> Key: KAFKA-3180
> URL: https://issues.apache.org/jira/browse/KAFKA-3180
> Project: Kafka
>  Issue Type: Test
>  Components: clients
>Affects Versions: 0.9.0.0
> Environment: cloudera 5.4.2.0
>Reporter: swayam
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Hi Team,
> Could you help me how to extract JSON info from https web page by help of 
> kafka into HDFS . 
> here is the json available URL : 
> https://affiliate-api.flipkart.net/affiliate/api/8924b177d4c64fcab4db860b94fbcea2.json
> Please help me to get the info ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6199) Single broker with fast growing heap usage

2018-05-18 Thread Robin Tweedie (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robin Tweedie resolved KAFKA-6199.
--
   Resolution: Fixed
Fix Version/s: 1.1.0

We upgraded this single broker to 1.1.0 (keeping the log format on 0.9) and 
have had 24 hours with no heap growth. I can only assume this leak was fixed 
somewhere between 0.10.2.1 and 1.1.0.

> Single broker with fast growing heap usage
> --
>
> Key: KAFKA-6199
> URL: https://issues.apache.org/jira/browse/KAFKA-6199
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1
> Environment: Amazon Linux
>Reporter: Robin Tweedie
>Priority: Major
> Fix For: 1.1.0
>
> Attachments: Screen Shot 2017-11-10 at 1.55.33 PM.png, Screen Shot 
> 2017-11-10 at 11.59.06 AM.png, dominator_tree.png, histo_live.txt, 
> histo_live_20171206.txt, histo_live_80.txt, jstack-2017-12-08.scrubbed.out, 
> merge_shortest_paths.png, path2gc.png
>
>
> We have a single broker in our cluster of 25 with fast growing heap usage 
> which necessitates us restarting it every 12 hours. If we don't restart the 
> broker, it becomes very slow from long GC pauses and eventually has 
> {{OutOfMemory}} errors.
> See {{Screen Shot 2017-11-10 at 11.59.06 AM.png}} for a graph of heap usage 
> percentage on the broker. A "normal" broker in the same cluster stays below 
> 50% (averaged) over the same time period.
> We have taken heap dumps when the broker's heap usage is getting dangerously 
> high, and there are a lot of retained {{NetworkSend}} objects referencing 
> byte buffers.
> We also noticed that the single affected broker logs a lot more of this kind 
> of warning than any other broker:
> {noformat}
> WARN Attempting to send response via channel for which there is no open 
> connection, connection id 13 (kafka.network.Processor)
> {noformat}
> See {{Screen Shot 2017-11-10 at 1.55.33 PM.png}} for counts of that WARN log 
> message visualized across all the brokers (to show it happens a bit on other 
> brokers, but not nearly as much as it does on the "bad" broker).
> I can't make the heap dumps public, but would appreciate advice on how to pin 
> down the problem better. We're currently trying to narrow it down to a 
> particular client, but without much success so far.
> Let me know what else I could investigate or share to track down the source 
> of this leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-1977) Make logEndOffset available in the Zookeeper consumer

2018-05-18 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1977.
--
Resolution: Auto Closed

Closing this as the Scala consumers have been deprecated and no further work is 
planned. This requirement will be tracked in KAFKA-2500 for java consumer.

> Make logEndOffset available in the Zookeeper consumer
> -
>
> Key: KAFKA-1977
> URL: https://issues.apache.org/jira/browse/KAFKA-1977
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Will Funnell
>Priority: Minor
> Attachments: 
> Make_logEndOffset_available_in_the_Zookeeper_consumer.patch
>
>
> The requirement is to create a snapshot from the Kafka topic but NOT do 
> continual reads after that point. For example you might be creating a backup 
> of the data to a file.
> In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps 
> was to expose the high watermark, as maxEndOffset, from the FetchResponse 
> object through to each MessageAndMetadata object in order to be aware when 
> the consumer has reached the end of each partition.
> The submitted patch achieves this by adding the maxEndOffset to the 
> PartitionTopicInfo, which is updated when a new message arrives in the 
> ConsumerFetcherThread and then exposed in MessageAndMetadata.
> See here for discussion:
> http://search-hadoop.com/m/4TaT4TpJy71



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)