[jira] [Resolved] (KAFKA-6445) Remove deprecated metrics in 2.0

2018-06-14 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-6445.

   Resolution: Fixed
 Assignee: Dong Lin  (was: Charly Molter)
Fix Version/s: (was: 2.1.0)
   2.0.0

> Remove deprecated metrics in 2.0
> 
>
> Key: KAFKA-6445
> URL: https://issues.apache.org/jira/browse/KAFKA-6445
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Charly Molter
>Assignee: Dong Lin
>Priority: Trivial
> Fix For: 2.0.0
>
>
> As part of KIP-225 we've replaced a metric and deprecated the old one.
> We should remove these metrics in 2.0.0 this Jira is to track all of the 
> metrics to remove in 2.0.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5950) AdminClient should retry based on returned error codes

2018-06-14 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513367#comment-16513367
 ] 

Ismael Juma commented on KAFKA-5950:


[~cmccabe], this has been done right?

> AdminClient should retry based on returned error codes
> --
>
> Key: KAFKA-5950
> URL: https://issues.apache.org/jira/browse/KAFKA-5950
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Andrey Dyachkov
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> The AdminClient only retries if the request fails with a retriable error. If 
> a response is returned, then a retry is never attempted. This is inconsistent 
> with other clients that check the error codes in the response and retry for 
> each retriable error code.
> We should consider if it makes sense to adopt this behaviour in the 
> AdminClient so that users don't have to do it themselves.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7021) Source KTable checkpoint is not correct

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513358#comment-16513358
 ] 

ASF GitHub Bot commented on KAFKA-7021:
---

guozhangwang closed pull request #5232: KAFKA-7021: checkpoint offsets from 
committed
URL: https://github.com/apache/kafka/pull/5232
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 364fbe855d1..7f6ac7ca614 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -163,7 +163,7 @@ public String toString(final String indent) {
 return sb.toString();
 }
 
-protected Map recordCollectorOffsets() {
+protected Map activeTaskCheckpointableOffsets() {
 return Collections.emptyMap();
 }
 
@@ -234,7 +234,7 @@ void closeStateManager(final boolean writeCheckpoint) 
throws ProcessorStateExcep
 ProcessorStateException exception = null;
 log.trace("{} Closing state manager", logPrefix);
 try {
-stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() 
: null);
 } catch (final ProcessorStateException e) {
 exception = e;
 } finally {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 8d46da19904..a18175ac58b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -81,7 +81,7 @@ public StateDirectory(final String applicationId, final 
String stateDirConfig, f
  * @param taskId
  * @return directory for the {@link TaskId}
  */
-File directoryForTask(final TaskId taskId) {
+public File directoryForTask(final TaskId taskId) {
 final File taskDir = new File(stateDir, taskId.toString());
 if (!taskDir.exists() && !taskDir.mkdir()) {
 throw new ProcessorStateException(String.format("task directory 
[%s] doesn't exist and couldn't be created",
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4b24aab65a3..86855f39c6e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -286,7 +286,7 @@ void commit(final boolean startNewTransaction) {
 public void run() {
 flushState();
 if (!eosEnabled) {
-stateMgr.checkpoint(recordCollectorOffsets());
+stateMgr.checkpoint(activeTaskCheckpointableOffsets());
 }
 commitOffsets(startNewTransaction);
 }
@@ -297,8 +297,17 @@ public void run() {
 }
 
 @Override
-protected Map recordCollectorOffsets() {
-return recordCollector.offsets();
+
+protected Map activeTaskCheckpointableOffsets() {
+// put both producer acked offsets and consumer committed offsets as 
checkpointable offsets
+final Map checkpointableOffsets = 
recordCollector.offsets();
+for (final Map.Entry entry : 
consumedOffsets.entrySet()) {
+if (!checkpointableOffsets.containsKey(entry.getKey())) {
+checkpointableOffsets.put(entry.getKey(), entry.getValue());
+}
+}
+
+return checkpointableOffsets;
 }
 
 @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
new file mode 100644
index 000..54c2bd7ede4
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use 

[jira] [Updated] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail

2018-06-14 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6360:
---
Fix Version/s: 1.0.2
   0.11.0.3
   0.10.2.2

> RocksDB segments not removed when store is closed causes re-initialization to 
> fail
> --
>
> Key: KAFKA-6360
> URL: https://issues.apache.org/jira/browse/KAFKA-6360
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Blocker
> Fix For: 0.10.2.2, 1.1.0, 0.11.0.3, 1.0.2
>
>
> When a store is re-initialized it is first closed, before it is opened again. 
> When this happens the segments in the {{Segments}} class are closed, but they 
> are not removed from the list of segments. So when the store is 
> re-initialized the old closed segments are used. This results in:
> {code}
> [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] 
> task [1_3] Failed to flush state store 
> KSTREAM-AGGREGATE-STATE-STORE-24:  
> (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail

2018-06-14 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6360:
---
Affects Version/s: 0.10.2.1
   0.11.0.2
   1.0.1

> RocksDB segments not removed when store is closed causes re-initialization to 
> fail
> --
>
> Key: KAFKA-6360
> URL: https://issues.apache.org/jira/browse/KAFKA-6360
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Blocker
> Fix For: 1.1.0
>
>
> When a store is re-initialized it is first closed, before it is opened again. 
> When this happens the segments in the {{Segments}} class are closed, but they 
> are not removed from the list of segments. So when the store is 
> re-initialized the old closed segments are used. This results in:
> {code}
> [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] 
> task [1_3] Failed to flush state store 
> KSTREAM-AGGREGATE-STATE-STORE-24:  
> (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-14 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7055.

   Resolution: Fixed
Fix Version/s: 2.0.0

> Kafka Streams Processor API allows you to add sinks and processors without 
> parent
> -
>
> Key: KAFKA-7055
> URL: https://issues.apache.org/jira/browse/KAFKA-7055
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Nikki Thean
>Assignee: Nikki Thean
>Priority: Minor
>  Labels: easyfix
> Fix For: 2.0.0
>
>
> The Kafka Streams Processor API allows you to define a Topology and connect 
> sources, processors, and sinks. From reading through the code, it seems that 
> you cannot forward a message to a downstream node unless it is explicitly 
> connected to the upstream node (from which you are forwarding the message) as 
> a child. Here is an example where you forward using name of downstream node 
> rather than child index 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]).
> However, I've been able to connect processors and sinks to the topology 
> without including parent names, i.e with empty vararg, using this method: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423].
> As any attempt to forward a message to those nodes will throw a 
> StreamsException, I suggest throwing an exception if a processor or sink is 
> added without at least one upstream node. There is a method in 
> `InternalTopologyBuilder` that allows you to connect processors by name after 
> you add them to the topology, but it is not part of the external Processor 
> API.
> In addition (or alternatively), I suggest making [the error message for when 
> users try to forward messages to a node that is not 
> connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119]
>  more descriptive, like this one for when a user attempts to access a state 
> store that is not connected to the processor: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513283#comment-16513283
 ] 

ASF GitHub Bot commented on KAFKA-7055:
---

mjsax closed pull request #5215: KAFKA-7055: Update InternalTopologyBuilder to 
throw TopologyException if a processor or sink is added with no upstream node 
attached
URL: https://github.com/apache/kafka/pull/5215
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7356aff153f..e7dabbf649c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -580,7 +580,6 @@ public boolean test(final K1 key, final V1 value) {
 final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME 
: JOIN_NAME);
 builder.internalTopologyBuilder.addProcessor(name, new 
KStreamKTableJoin<>(((KTableImpl) other).valueGetterSupplier(), 
joiner, leftJoin), this.name);
 builder.internalTopologyBuilder.connectProcessorAndStateStores(name, 
((KTableImpl) other).valueGetterSupplier().storeNames());
-builder.internalTopologyBuilder.connectProcessors(this.name, 
((KTableImpl) other).name);
 
 return new KStreamImpl<>(builder, name, allSourceNodes, false);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 36a2edc6766..5b4b4d737b4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -442,6 +442,11 @@ public final void addSource(final Topology.AutoOffsetReset 
offsetReset,
  final String... predecessorNames) {
 Objects.requireNonNull(name, "name must not be null");
 Objects.requireNonNull(topic, "topic must not be null");
+Objects.requireNonNull(predecessorNames, "predecessor names must not 
be null");
+if (predecessorNames.length == 0) {
+throw new TopologyException("Sink " + name + " must have at least 
one parent");
+}
+
 addSink(name, new StaticTopicNameExtractor(topic), 
keySerializer, valSerializer, partitioner, predecessorNames);
 nodeToSinkTopic.put(name, topic);
 }
@@ -454,9 +459,13 @@ public final void addSource(final Topology.AutoOffsetReset 
offsetReset,
  final String... predecessorNames) {
 Objects.requireNonNull(name, "name must not be null");
 Objects.requireNonNull(topicExtractor, "topic extractor must not be 
null");
+Objects.requireNonNull(predecessorNames, "predecessor names must not 
be null");
 if (nodeFactories.containsKey(name)) {
 throw new TopologyException("Processor " + name + " is already 
added.");
 }
+if (predecessorNames.length == 0) {
+throw new TopologyException("Sink " + name + " must have at least 
one parent");
+}
 
 for (final String predecessor : predecessorNames) {
 Objects.requireNonNull(predecessor, "predecessor name can't be 
null");
@@ -481,9 +490,13 @@ public final void addProcessor(final String name,
final String... predecessorNames) {
 Objects.requireNonNull(name, "name must not be null");
 Objects.requireNonNull(supplier, "supplier must not be null");
+Objects.requireNonNull(predecessorNames, "predecessor names must not 
be null");
 if (nodeFactories.containsKey(name)) {
 throw new TopologyException("Processor " + name + " is already 
added.");
 }
+if (predecessorNames.length == 0) {
+throw new TopologyException("Processor " + name + " must have at 
least one parent");
+}
 
 for (final String predecessor : predecessorNames) {
 Objects.requireNonNull(predecessor, "predecessor name must not be 
null");
@@ -592,21 +605,6 @@ public final void markSourceStoreAndTopic(final 
StoreBuilder storeBuilder,
 storeToSourceChangelogTopic.put(storeBuilder, topic);
 }
 
-public final void connectProcessors(final String... processorNames) {
-if (processorNames.length < 2) {
-throw new TopologyException("At least two processors need to 
participate in the 

[jira] [Commented] (KAFKA-7060) Command-line overrides for ConnectDistributed worker properties

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513158#comment-16513158
 ] 

ASF GitHub Bot commented on KAFKA-7060:
---

kevin-laff opened a new pull request #5234: KAFKA-7060: Add override arguments 
to ConnectDistributed command line
URL: https://github.com/apache/kafka/pull/5234
 
 
   Allow ConnectDistributed to accept an unlimited number of --override 
key=value command-line arguments.
   
   This is an implementation of KIP-316.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Command-line overrides for ConnectDistributed worker properties
> ---
>
> Key: KAFKA-7060
> URL: https://issues.apache.org/jira/browse/KAFKA-7060
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Kevin Lafferty
>Priority: Major
>
> This Jira is for tracking the implementation for 
> [KIP-316|https://cwiki.apache.org/confluence/display/KAFKA/KIP-316%3A+Command-line+overrides+for+ConnectDistributed+worker+properties].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning

2018-06-14 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-6975:
---
Fix Version/s: 1.1.1

> AdminClient.deleteRecords() may cause replicas unable to fetch from beginning
> -
>
> Key: KAFKA-6975
> URL: https://issues.apache.org/jira/browse/KAFKA-6975
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Blocker
> Fix For: 2.0.0, 1.1.1
>
>
> AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to 
> the requested offset. If the requested offset is in the middle of the batch, 
> the replica will not be able to fetch from that offset (because it is in the 
> middle of the batch). 
> One use-case where this could cause problems is replica re-assignment. 
> Suppose we have a topic partition with 3 initial replicas, and at some point 
> the user issues  AdminClient.deleteRecords() for the offset that falls in the 
> middle of the batch. It now becomes log start offset for this topic 
> partition. Suppose at some later time, the user starts partition 
> re-assignment to 3 new replicas. The new replicas (followers) will start with 
> HW = 0, will try to fetch from 0, then get "out of order offset" because 0 < 
> log start offset (LSO); the follower will be able to reset offset to LSO of 
> the leader and fetch LSO; the leader will send a batch in response with base 
> offset  stop the fetcher thread. The end result is that the new replicas will not be 
> able to start fetching unless LSO moves to an offset that is not in the 
> middle of the batch, and the re-assignment will be stuck for a possibly a 
> very log time. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized

2018-06-14 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513135#comment-16513135
 ] 

Matthias J. Sax commented on KAFKA-5054:


There was a PR that got merged IMHO. I think we can close this with fixed 
version 2.0. \cc [~damianguy] [~guozhang]

> ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
> 
>
> Key: KAFKA-5054
> URL: https://issues.apache.org/jira/browse/KAFKA-5054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Critical
> Fix For: 2.1.0
>
>
> {{putIfAbsent}} and {{delete}} should be synchronized as they involve at 
> least 2 operations on the underlying store and may result in inconsistent 
> results if someone were to query via IQ



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7060) Command-line overrides for ConnectDistributed worker properties

2018-06-14 Thread Kevin Lafferty (JIRA)
Kevin Lafferty created KAFKA-7060:
-

 Summary: Command-line overrides for ConnectDistributed worker 
properties
 Key: KAFKA-7060
 URL: https://issues.apache.org/jira/browse/KAFKA-7060
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Kevin Lafferty


This Jira is for tracking the implementation for 
[KIP-316|https://cwiki.apache.org/confluence/display/KAFKA/KIP-316%3A+Command-line+overrides+for+ConnectDistributed+worker+properties].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513125#comment-16513125
 ] 

ASF GitHub Bot commented on KAFKA-6711:
---

mjsax closed pull request #4789: KAFKA-6711: GlobalStateManagerImpl should not 
write offsets
URL: https://github.com/apache/kafka/pull/4789
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 6052f96053c..be160bd5a33 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -167,7 +167,7 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 for (final TopicPartition topicPartition : topicPartitions) {
 consumer.assign(Collections.singletonList(topicPartition));
 final Long checkpoint = checkpointableOffsets.get(topicPartition);
-if (checkpoint != null) {
+if (checkpoint != null && checkpoint > 
StateRestorer.NO_CHECKPOINT) {
 consumer.seek(topicPartition, checkpoint);
 } else {
 
consumer.seekToBeginning(Collections.singletonList(topicPartition));
@@ -249,10 +249,33 @@ public void close(final Map 
offsets) throws IOException {
 
 @Override
 public void checkpoint(final Map offsets) {
+
+// Find non persistent store's topics
+final Map storeToChangelogTopic = 
topology.storeToChangelogTopic();
+final Set globalNonPersistentStoresTopics = new HashSet<>();
+for (final StateStore store : topology.globalStateStores()) {
+if (!store.persistent() && 
storeToChangelogTopic.containsKey(store.name())) {
+
globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+}
+}
+
 checkpointableOffsets.putAll(offsets);
-if (!checkpointableOffsets.isEmpty()) {
+
+final Map filteredOffsets = new HashMap<>();
+
+// Skip non persistent store
+for (final Map.Entry topicPartitionOffset : 
checkpointableOffsets.entrySet()) {
+final String topic = topicPartitionOffset.getKey().topic();
+if (globalNonPersistentStoresTopics.contains(topic)) {
+filteredOffsets.put(topicPartitionOffset.getKey(), (long) 
StateRestorer.NO_CHECKPOINT);
+} else {
+filteredOffsets.put(topicPartitionOffset.getKey(), 
topicPartitionOffset.getValue());
+}
+}
+
+if (!filteredOffsets.isEmpty()) {
 try {
-checkpoint.write(checkpointableOffsets);
+checkpoint.write(filteredOffsets);
 } catch (IOException e) {
 log.warn("Failed to write offsets checkpoint for global 
stores", e);
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index e9d61f5ad64..7e838763031 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -68,12 +68,13 @@
 private final MockStateRestoreListener stateRestoreListener = new 
MockStateRestoreListener();
 private final TopicPartition t1 = new TopicPartition("t1", 1);
 private final TopicPartition t2 = new TopicPartition("t2", 1);
+private final TopicPartition t3 = new TopicPartition("t3", 1);
 private GlobalStateManagerImpl stateManager;
 private NoOpProcessorContext context;
 private StateDirectory stateDirectory;
 private String stateDirPath;
 private NoOpReadOnlyStore store1;
-private NoOpReadOnlyStore store2;
+private NoOpReadOnlyStore store2, store3;
 private MockConsumer consumer;
 private File checkpointFile;
 private ProcessorTopology topology;
@@ -83,18 +84,21 @@ public void before() throws IOException {
 final Map storeToTopic = new HashMap<>();
 storeToTopic.put("t1-store", "t1");
 storeToTopic.put("t2-store", "t2");
+storeToTopic.put("t3-store", "t3");
 
 final Map storeToProcessorNode = new 
HashMap<>();
-store1 = new NoOpReadOnlyStore<>("t1-store");
+store1 = new NoOpReadOnlyStore<>("t1-store", true);
 

[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513124#comment-16513124
 ] 

ASF GitHub Bot commented on KAFKA-6711:
---

mjsax closed pull request #4782: KAFKA-6711: GlobalStateManagerImpl should not 
write offsets of in-mem…
URL: https://github.com/apache/kafka/pull/4782
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 56e6bed0850..17ae70ce0d1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -41,6 +41,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -242,7 +243,7 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
 final Long checkpoint = checkpointableOffsets.get(topicPartition);
-if (checkpoint != null) {
+if (checkpoint != null && checkpoint > 
StateRestorer.NO_CHECKPOINT) {
 globalConsumer.seek(topicPartition, checkpoint);
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
@@ -334,10 +335,33 @@ public void close(final Map 
offsets) throws IOException {
 
 @Override
 public void checkpoint(final Map offsets) {
+
+// Find non persistent store's topics
+final Map storeToChangelogTopic = 
topology.storeToChangelogTopic();
+final Set globalNonPersistentStoresTopics = new HashSet<>();
+for (final StateStore store : topology.globalStateStores()) {
+if (!store.persistent() && 
storeToChangelogTopic.containsKey(store.name())) {
+
globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+}
+}
+
 checkpointableOffsets.putAll(offsets);
-if (!checkpointableOffsets.isEmpty()) {
+
+final Map filteredOffsets = new HashMap<>();
+
+// Skip non persistent store
+for (final Map.Entry topicPartitionOffset : 
checkpointableOffsets.entrySet()) {
+final String topic = topicPartitionOffset.getKey().topic();
+if (globalNonPersistentStoresTopics.contains(topic)) {
+filteredOffsets.put(topicPartitionOffset.getKey(), (long) 
StateRestorer.NO_CHECKPOINT);
+} else {
+filteredOffsets.put(topicPartitionOffset.getKey(), 
topicPartitionOffset.getValue());
+}
+}
+
+if (!filteredOffsets.isEmpty()) {
 try {
-checkpoint.write(checkpointableOffsets);
+checkpoint.write(filteredOffsets);
 } catch (IOException e) {
 log.warn("Failed to write offset checkpoint file to {} for 
global stores: {}", checkpoint, e);
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index df8d2010d24..39b7bb4747f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -488,6 +488,16 @@ public void shouldCheckpointRestoredOffsetsToFile() throws 
IOException {
 assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
 }
 
+@Test
+public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws 
IOException {
+stateManager.initialize();
+initializeConsumer(10, 1, t3);
+stateManager.register(store3, stateRestoreCallback);
+stateManager.close(Collections.emptyMap());
+
+assertThat(readOffsetsCheckpoint(), 
equalTo(Collections.singletonMap(t3, (long) StateRestorer.NO_CHECKPOINT)));
+}
+
 private Map readOffsetsCheckpoint() throws 
IOException {
 final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new 
File(stateManager.baseDir(),

 ProcessorStateManager.CHECKPOINT_FILE_NAME));
diff --git 

[jira] [Assigned] (KAFKA-5098) KafkaProducer.send() blocks and generates TimeoutException if topic name has illegal char

2018-06-14 Thread Ahmed Al-Mehdi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Al-Mehdi reassigned KAFKA-5098:
-

Assignee: Ahmed Al-Mehdi  (was: huxihx)

> KafkaProducer.send() blocks and generates TimeoutException if topic name has 
> illegal char
> -
>
> Key: KAFKA-5098
> URL: https://issues.apache.org/jira/browse/KAFKA-5098
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
> Environment: Java client running against server using 
> wurstmeister/kafka Docker image.
>Reporter: Jeff Larsen
>Assignee: Ahmed Al-Mehdi
>Priority: Major
> Fix For: 2.1.0
>
>
> The server is running with auto create enabled. If we try to publish to a 
> topic with a forward slash in the name, the call blocks and we get a 
> TimeoutException in the Callback. I would expect it to return immediately 
> with an InvalidTopicException.
> There are other blocking issues that have been reported which may be related 
> to some degree, but this particular cause seems unrelated.
> Sample code:
> {code}
> import org.apache.kafka.clients.producer.*;
> import java.util.*;
> public class KafkaProducerUnexpectedBlockingAndTimeoutException {
>   public static void main(String[] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "kafka.example.com:9092");
> props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("max.block.ms", 1); // 10 seconds should illustrate our 
> point
> String separator = "/";
> //String separator = "_";
> try (Producer producer = new KafkaProducer<>(props)) {
>   System.out.println("Calling KafkaProducer.send() at " + new Date());
>   producer.send(
>   new ProducerRecord("abc" + separator + 
> "someStreamName",
>   "Not expecting a TimeoutException here"),
>   new Callback() {
> @Override
> public void onCompletion(RecordMetadata metadata, Exception e) {
>   if (e != null) {
> System.out.println(e.toString());
>   }
> }
>   });
>   System.out.println("KafkaProducer.send() completed at " + new Date());
> }
>   }
> }
> {code}
> Switching to the underscore separator in the above example works as expected.
> Mea culpa: We neglected to research allowed chars in a topic name, but the 
> TimeoutException we encountered did not help point us in the right direction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-06-14 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6711:
---
Fix Version/s: 1.1.1
   1.0.2
   0.11.0.3
   0.10.2.2

> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Assignee: Cemalettin Koç
>Priority: Major
>  Labels: newbie
> Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5098) KafkaProducer.send() blocks and generates TimeoutException if topic name has illegal char

2018-06-14 Thread Ahmed Al-Mehdi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513059#comment-16513059
 ] 

Ahmed Al-Mehdi commented on KAFKA-5098:
---

Since there was no response from the current Jira owner, I went ahead and 
created a PR for this issue.

> KafkaProducer.send() blocks and generates TimeoutException if topic name has 
> illegal char
> -
>
> Key: KAFKA-5098
> URL: https://issues.apache.org/jira/browse/KAFKA-5098
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
> Environment: Java client running against server using 
> wurstmeister/kafka Docker image.
>Reporter: Jeff Larsen
>Assignee: huxihx
>Priority: Major
> Fix For: 2.1.0
>
>
> The server is running with auto create enabled. If we try to publish to a 
> topic with a forward slash in the name, the call blocks and we get a 
> TimeoutException in the Callback. I would expect it to return immediately 
> with an InvalidTopicException.
> There are other blocking issues that have been reported which may be related 
> to some degree, but this particular cause seems unrelated.
> Sample code:
> {code}
> import org.apache.kafka.clients.producer.*;
> import java.util.*;
> public class KafkaProducerUnexpectedBlockingAndTimeoutException {
>   public static void main(String[] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "kafka.example.com:9092");
> props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("max.block.ms", 1); // 10 seconds should illustrate our 
> point
> String separator = "/";
> //String separator = "_";
> try (Producer producer = new KafkaProducer<>(props)) {
>   System.out.println("Calling KafkaProducer.send() at " + new Date());
>   producer.send(
>   new ProducerRecord("abc" + separator + 
> "someStreamName",
>   "Not expecting a TimeoutException here"),
>   new Callback() {
> @Override
> public void onCompletion(RecordMetadata metadata, Exception e) {
>   if (e != null) {
> System.out.println(e.toString());
>   }
> }
>   });
>   System.out.println("KafkaProducer.send() completed at " + new Date());
> }
>   }
> }
> {code}
> Switching to the underscore separator in the above example works as expected.
> Mea culpa: We neglected to research allowed chars in a topic name, but the 
> TimeoutException we encountered did not help point us in the right direction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-14 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513045#comment-16513045
 ] 

Ted Yu commented on KAFKA-7012:
---

bq. not treating this case at all and suffering the (rare?) timeout?

Should be acceptable before better solution (with no performance penalty) is 
found.

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-14 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513037#comment-16513037
 ] 

radai rosenblatt edited comment on KAFKA-7012 at 6/14/18 9:46 PM:
--

i dont have the time to pick this up right now.

IIRC the original PR (https://github.com/apache/kafka/pull/2330) had a more 
complicated condition for when a key (channel) gets picked into 
`keysWithBufferedRead`. the codition is currently
{code}
if (channel.hasBytesBuffered()) {
//this channel has bytes enqueued in intermediary buffers 
that we could not read
//(possibly because no memory). it may be the case that the 
underlying socket will
//not come up in the next poll() and so we need to remember 
this channel for the
//next poll call otherwise data may be stuck in said 
buffers forever.
keysWithBufferedRead.add(key);
}
{code}
which results in lots of "false positives" - keys that have something left over 
in ssl buffers (likely, since request sizes are rarely a multiple of ssl cipher 
block sizes) that cause the next poll() cycle to be inefficient.

the conditions needs to check if a channel has something left *that could not 
be read out due to memory pressure*.

alternatively - the doomsday scenario this is meant to handle is pretty rare: 
if a channel has a request fully inside the ssl buffers that cannot be read due 
to memory pressure, *and* the underlying channel will never have any more 
incoming bytes (so will never come back from select) the request will sit there 
anr rot resulting in a client timeout.

the alternative to making the condition more complicated is not treating this 
case at all and suffering the (rare?) timeout?


was (Author: radai):
i dont have the time to pick this up right now.

IIRC the original PR (https://github.com/apache/kafka/pull/2330) had a more 
complicated condition for when a key (channel) gets picked into 
`keysWithBufferedRead`. the codition is currently
```java
if (channel.hasBytesBuffered()) {
//this channel has bytes enqueued in intermediary buffers 
that we could not read
//(possibly because no memory). it may be the case that the 
underlying socket will
//not come up in the next poll() and so we need to remember 
this channel for the
//next poll call otherwise data may be stuck in said 
buffers forever.
keysWithBufferedRead.add(key);
}
```
which results in lots of "false positives" - keys that have something left over 
in ssl buffers (likely, since request sizes are rarely a multiple of ssl cipher 
block sizes) that cause the next poll() cycle to be inefficient.

the conditions needs to check if a channel has something left *that could not 
be read out due to memory pressure*.

alternatively - the doomsday scenario this is meant to handle is pretty rare: 
if a channel has a request fully inside the ssl buffers that cannot be read due 
to memory pressure, *and* the underlying channel will never have any more 
incoming bytes (so will never come back from select) the request will sit there 
anr rot resulting in a client timeout.

the alternative to making the condition more complicated is not treating this 
case at all and suffering the (rare?) timeout?

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU 

[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-14 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513037#comment-16513037
 ] 

radai rosenblatt commented on KAFKA-7012:
-

i dont have the time to pick this up right now.

IIRC the original PR (https://github.com/apache/kafka/pull/2330) had a more 
complicated condition for when a key (channel) gets picked into 
`keysWithBufferedRead`. the codition is currently
```java
if (channel.hasBytesBuffered()) {
//this channel has bytes enqueued in intermediary buffers 
that we could not read
//(possibly because no memory). it may be the case that the 
underlying socket will
//not come up in the next poll() and so we need to remember 
this channel for the
//next poll call otherwise data may be stuck in said 
buffers forever.
keysWithBufferedRead.add(key);
}
```
which results in lots of "false positives" - keys that have something left over 
in ssl buffers (likely, since request sizes are rarely a multiple of ssl cipher 
block sizes) that cause the next poll() cycle to be inefficient.

the conditions needs to check if a channel has something left *that could not 
be read out due to memory pressure*.

alternatively - the doomsday scenario this is meant to handle is pretty rare: 
if a channel has a request fully inside the ssl buffers that cannot be read due 
to memory pressure, *and* the underlying channel will never have any more 
incoming bytes (so will never come back from select) the request will sit there 
anr rot resulting in a client timeout.

the alternative to making the condition more complicated is not treating this 
case at all and suffering the (rare?) timeout?

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513030#comment-16513030
 ] 

ASF GitHub Bot commented on KAFKA-6975:
---

hachikuji closed pull request #5229: KAFKA-6975: Fix replica fetching from 
non-batch-aligned log start offset
URL: https://github.com/apache/kafka/pull/5229
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 68faf00c079..9339d29202f 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import com.yammer.metrics.core.Gauge
 import kafka.api.LeaderAndIsr
 import kafka.api.Request
+import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.KafkaController
 import kafka.log.{LogAppendInfo, LogConfig}
 import kafka.metrics.KafkaMetricsGroup
@@ -30,7 +31,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zk.AdminZkClient
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{NotEnoughReplicasException, 
NotLeaderForPartitionException, PolicyViolationException}
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, 
NotEnoughReplicasException, NotLeaderForPartitionException, 
PolicyViolationException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.record.MemoryRecords
@@ -187,6 +188,10 @@ class Partition(val topic: String,
 
   def getReplica(replicaId: Int = localBrokerId): Option[Replica] = 
Option(allReplicasMap.get(replicaId))
 
+  def getReplicaOrException(replicaId: Int = localBrokerId): Replica =
+getReplica(replicaId).getOrElse(
+  throw new ReplicaNotAvailableException(s"Replica $replicaId is not 
available for partition $topicPartition"))
+
   def leaderReplicaIfLocal: Option[Replica] =
 leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
 
@@ -549,15 +554,41 @@ class Partition(val topic: String,
 laggingReplicas
   }
 
-  def appendRecordsToFutureReplica(records: MemoryRecords) {
-
getReplica(Request.FutureLocalReplicaId).get.log.get.appendAsFollower(records)
+  private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, 
isFuture: Boolean): Unit = {
+  if (isFuture)
+
getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records)
+  else {
+// The read lock is needed to prevent the follower replica from being 
updated while ReplicaAlterDirThread
+// is executing maybeDeleteAndSwapFutureReplica() to replace follower 
replica with the future replica.
+inReadLock(leaderIsrUpdateLock) {
+   getReplicaOrException().log.get.appendAsFollower(records)
+}
+  }
   }
 
-  def appendRecordsToFollower(records: MemoryRecords) {
-// The read lock is needed to prevent the follower replica from being 
updated while ReplicaAlterDirThread
-// is executing maybeDeleteAndSwapFutureReplica() to replace follower 
replica with the future replica.
-inReadLock(leaderIsrUpdateLock) {
-  getReplica().get.log.get.appendAsFollower(records)
+  def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: 
Boolean) {
+try {
+  doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
+} catch {
+  case e: UnexpectedAppendOffsetException =>
+val replica = if (isFuture) 
getReplicaOrException(Request.FutureLocalReplicaId) else getReplicaOrException()
+val logEndOffset = replica.logEndOffset.messageOffset
+if (logEndOffset == replica.logStartOffset &&
+e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) {
+  // This may happen if the log start offset on the leader (or current 
replica) falls in
+  // the middle of the batch due to delete records request and the 
follower tries to
+  // fetch its first offset from the leader.
+  // We handle this case here instead of Log#append() because we will 
need to remove the
+  // segment that start with log start offset and create a new one 
with earlier offset
+  // (base offset of the batch), which will move recoveryPoint 
backwards, so we will need
+  // to checkpoint the new recovery point before we append
+  val replicaName = if (isFuture) "future replica" else "follower"
+  info(s"Unexpected offset in append to $topicPartition. First offset 
${e.firstOffset} is less than log start offset 

[jira] [Commented] (KAFKA-7021) Source KTable checkpoint is not correct

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513013#comment-16513013
 ] 

ASF GitHub Bot commented on KAFKA-7021:
---

guozhangwang opened a new pull request #5232: KAFKA-7021: checkpoint offsets 
from committed
URL: https://github.com/apache/kafka/pull/5232
 
 
   This is a cherry-pick PR from https://github.com/apache/kafka/pull/5207
   
   1) add the committed offsets to checkpointable offset map.
   
   2) add the restoration integration test for the source KTable case.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Source KTable checkpoint is not correct
> ---
>
> Key: KAFKA-7021
> URL: https://issues.apache.org/jira/browse/KAFKA-7021
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Kafka Streams treats source KTables,ie, table created via `builder.table()`, 
> differently. Instead of creating a changelog topic, the original source topic 
> is use to avoid unnecessary data redundancy.
> However, Kafka Streams does not write a correct local state checkpoint file. 
> This results in unnecessary state restore after a rebalance. Instead of the 
> latest committed offset, the latest restored offset is written into the 
> checkpoint file in `ProcessorStateManager#close()`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7010) Rename ResourceNameType.ANY to MATCH

2018-06-14 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7010.

Resolution: Fixed

merged the PR to trunk and 2.0 branch.

> Rename ResourceNameType.ANY to MATCH
> 
>
> Key: KAFKA-7010
> URL: https://issues.apache.org/jira/browse/KAFKA-7010
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Following on from the PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...] and discussions with 
> Colin McCabe...
> The current ResourceNameType.ANY may be misleading as it performs pattern 
> matching for wildcard and prefixed bindings. Where as ResourceName.ANY just 
> brings back any resource name.
> Renaming to ResourceNameType.MATCH and adding more Java doc should clear this 
> up.
> Finally, ResourceNameType is no longer appropriate as the type is used in 
> ResourcePattern and ResourcePatternFilter. Hence rename to PatternType.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-14 Thread rajadayalan perumalsamy (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512935#comment-16512935
 ] 

rajadayalan perumalsamy commented on KAFKA-7012:


Thanks, are you planning a PR to change the behaviour of polling buffered keys? 

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task

2018-06-14 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512881#comment-16512881
 ] 

Matthias J. Sax commented on KAFKA-6583:


Yes. KIP-268 is done.

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2983) Remove old Scala consumer and all related code, tests, and tools

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512864#comment-16512864
 ] 

ASF GitHub Bot commented on KAFKA-2983:
---

ijuma opened a new pull request #5230: KAFKA-2983: Remove Scala consumers and 
related code
URL: https://github.com/apache/kafka/pull/5230
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old Scala consumer and all related code, tests, and tools
> 
>
> Key: KAFKA-2983
> URL: https://issues.apache.org/jira/browse/KAFKA-2983
> Project: Kafka
>  Issue Type: Task
>Reporter: Grant Henke
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6982) java.lang.ArithmeticException: / by zero

2018-06-14 Thread wade wu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512826#comment-16512826
 ] 

wade wu commented on KAFKA-6982:


You can manually add that line of code and build Kafka.


-- 
Best Regards
 吴清俊|Wade Wu


> java.lang.ArithmeticException: / by zero
> 
>
> Key: KAFKA-6982
> URL: https://issues.apache.org/jira/browse/KAFKA-6982
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.1.0
> Environment: Environment: Windows 10. 
>Reporter: wade wu
>Priority: Major
> Fix For: 1.1.1
>
>
> Producer keeps sending messages to Kafka, Kafka is down. 
> Server.log shows: 
> ..
> [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
> ..
>  
> This line of code in SocketServer.scala causing the error: 
>                   {color:#33} currentProcessor = 
> currentProcessor{color:#d04437} % processors.size{color}{color}
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6982) java.lang.ArithmeticException: / by zero

2018-06-14 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512824#comment-16512824
 ] 

Manikumar commented on KAFKA-6982:
--

This is a minor issue. Only occurs for a brief period during server startup. 
Kafka 1.1.1 will be out by month end.

Or You can build the binary for the sources.
https://github.com/apache/kafka/tree/1.1

> java.lang.ArithmeticException: / by zero
> 
>
> Key: KAFKA-6982
> URL: https://issues.apache.org/jira/browse/KAFKA-6982
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.1.0
> Environment: Environment: Windows 10. 
>Reporter: wade wu
>Priority: Major
> Fix For: 1.1.1
>
>
> Producer keeps sending messages to Kafka, Kafka is down. 
> Server.log shows: 
> ..
> [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
> ..
>  
> This line of code in SocketServer.scala causing the error: 
>                   {color:#33} currentProcessor = 
> currentProcessor{color:#d04437} % processors.size{color}{color}
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2837) FAILING TEST: kafka.api.ProducerBounceTest > testBrokerFailure

2018-06-14 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-2837.

   Resolution: Fixed
Fix Version/s: (was: 0.10.0.0)
   2.0.0

This test has been removed.

> FAILING TEST: kafka.api.ProducerBounceTest > testBrokerFailure 
> ---
>
> Key: KAFKA-2837
> URL: https://issues.apache.org/jira/browse/KAFKA-2837
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 0.9.0.0
>Reporter: Gwen Shapira
>Assignee: jin xing
>Priority: Major
>  Labels: newbie
> Fix For: 2.0.0
>
>
> {code}
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.junit.Assert.assertTrue(Assert.java:52)
>   at 
> kafka.api.ProducerBounceTest.testBrokerFailure(ProducerBounceTest.scala:117)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
>   at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
>   at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>

[jira] [Comment Edited] (KAFKA-6982) java.lang.ArithmeticException: / by zero

2018-06-14 Thread Martin M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512810#comment-16512810
 ] 

Martin M edited comment on KAFKA-6982 at 6/14/18 5:42 PM:
--

Thanks [~omkreddy]. 

1.1.1 is not officially available for download.

How can we get the fix in 1.1.0?

thanks


was (Author: mar.ian):
Thanks [~omkreddy]. 

1.1.1 is not officially available for download.

How can we get the fix in 1.1.1?

thanks

> java.lang.ArithmeticException: / by zero
> 
>
> Key: KAFKA-6982
> URL: https://issues.apache.org/jira/browse/KAFKA-6982
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.1.0
> Environment: Environment: Windows 10. 
>Reporter: wade wu
>Priority: Major
> Fix For: 1.1.1
>
>
> Producer keeps sending messages to Kafka, Kafka is down. 
> Server.log shows: 
> ..
> [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
> ..
>  
> This line of code in SocketServer.scala causing the error: 
>                   {color:#33} currentProcessor = 
> currentProcessor{color:#d04437} % processors.size{color}{color}
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7021) Source KTable checkpoint is not correct

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512813#comment-16512813
 ] 

ASF GitHub Bot commented on KAFKA-7021:
---

guozhangwang closed pull request #5207: KAFKA-7021: checkpoint offsets from 
committed
URL: https://github.com/apache/kafka/pull/5207
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 0b028e67381..0c611199d88 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -107,7 +107,6 @@ public InternalStreamsBuilder(final InternalTopologyBuilder 
internalTopologyBuil
  name);
 
 internalTopologyBuilder.addStateStore(storeBuilder, name);
-
internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic);
 
 return kTable;
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d9c827fff52..bf6ceded143 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -164,7 +164,7 @@ public String toString(final String indent) {
 return sb.toString();
 }
 
-protected Map recordCollectorOffsets() {
+protected Map activeTaskCheckpointableOffsets() {
 return Collections.emptyMap();
 }
 
@@ -239,7 +239,7 @@ void closeStateManager(final boolean writeCheckpoint) 
throws ProcessorStateExcep
 ProcessorStateException exception = null;
 log.trace("Closing state manager");
 try {
-stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
+stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() 
: null);
 } catch (final ProcessorStateException e) {
 exception = e;
 } finally {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 7d4b592b6a6..a9d5a93f228 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -122,7 +122,7 @@
 
 private Map> nodeGroups = null;
 
-interface StateStoreFactory {
+public interface StateStoreFactory {
 Set users();
 boolean loggingEnabled();
 StateStore build();
@@ -1883,4 +1883,10 @@ public void updateSubscribedTopics(final Set 
topics, final String logPre
 subscriptionUpdates.updateTopics(topics);
 updateSubscriptions(subscriptionUpdates, logPrefix);
 }
+
+// following functions are for test only
+
+public synchronized Map getStateStores() {
+return stateFactories;
+}
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 195ea99f62d..b0761ac5507 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -46,7 +46,7 @@
 private final boolean isStandby;
 private final ChangelogReader changelogReader;
 private final Map offsetLimits;
-private final Map restoredOffsets;
+private final Map standbyRestoredOffsets;
 private final Map restoreCallbacks; // used 
for standby tasks, keyed by state topic name
 private final Map storeToChangelogTopic;
 private final List changelogPartitions = new ArrayList<>();
@@ -79,9 +79,9 @@ public ProcessorStateManager(final TaskId taskId,
 partitionForTopic.put(source.topic(), source);
 }
 offsetLimits = new HashMap<>();
-restoredOffsets = new HashMap<>();
+standbyRestoredOffsets = new HashMap<>();
 this.isStandby = isStandby;
-restoreCallbacks = isStandby ? new HashMap<>() : null;
+restoreCallbacks = isStandby ? new HashMap() : null;
 this.storeToChangelogTopic = 

[jira] [Assigned] (KAFKA-4237) Avoid long request timeout for the consumer

2018-06-14 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-4237:
--

Assignee: Jason Gustafson

> Avoid long request timeout for the consumer
> ---
>
> Key: KAFKA-4237
> URL: https://issues.apache.org/jira/browse/KAFKA-4237
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> In the consumer rebalance protocol, the JoinGroup can stay in purgatory on 
> the server for as long as the rebalance timeout. For the Java client, that 
> means that the request timeout must be at least as large as the rebalance 
> timeout (which is governed by {{max.poll.interval.ms}} since KIP-62 and 
> {{session.timeout.ms}} before then). By default, since 0.10.1, this is 5 
> minutes plus some change, which makes the clients slow to detect some hard 
> failures.
> To fix this, two options come to mind:
> 1. Right now, all request APIs are limited by the same request timeout in 
> {{NetworkClient}}, but there's not really any reason why this must be so. We 
> could use a separate timeout for the JoinGroup request (the implementations 
> of this is straightforward: 
> https://github.com/confluentinc/kafka/pull/108/files).
> 2. Alternatively, we could prevent the server from holding the JoinGroup in 
> purgatory for such a long time. Instead, it could return early from the 
> JoinGroup (say before the session timeout has expired) with an error code 
> (e.g. REBALANCE_IN_PROGRESS), which tells the client that it should just 
> resend the JoinGroup.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6982) java.lang.ArithmeticException: / by zero

2018-06-14 Thread Martin M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512810#comment-16512810
 ] 

Martin M commented on KAFKA-6982:
-

Thanks [~omkreddy]. 

1.1.1 is not officially available for download.

How can we get the fix in 1.1.1?

thanks

> java.lang.ArithmeticException: / by zero
> 
>
> Key: KAFKA-6982
> URL: https://issues.apache.org/jira/browse/KAFKA-6982
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.1.0
> Environment: Environment: Windows 10. 
>Reporter: wade wu
>Priority: Major
> Fix For: 1.1.1
>
>
> Producer keeps sending messages to Kafka, Kafka is down. 
> Server.log shows: 
> ..
> [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
> ..
>  
> This line of code in SocketServer.scala causing the error: 
>                   {color:#33} currentProcessor = 
> currentProcessor{color:#d04437} % processors.size{color}{color}
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512809#comment-16512809
 ] 

ASF GitHub Bot commented on KAFKA-6975:
---

apovzner opened a new pull request #5229: KAFKA-6975; Fix replica fetching from 
non-batch-aligned log start offset
URL: https://github.com/apache/kafka/pull/5229
 
 
   It is possible that log start offset may fall in the middle of the batch 
after AdminClient#deleteRecords(). This will cause a follower starting from log 
start offset to fail fetching (all records). Use-cases when a follower will 
start fetching from log start offset includes: 1) new replica due to partition 
re-assignment; 2) new local replica created as a result of 
AdminClient#AlterReplicaLogDirs(); 3) broker that was down for some time while 
AdminClient#deleteRecords() move log start offset beyond its HW.
   
   Added two integration tests:
   1) Produce and then AdminClient#deleteRecords() while one of the followers 
is down, and then restart of the follower requires fetching from log start 
offset;
   2)  AdminClient#AlterReplicaLogDirs() after AdminClient#deleteRecords()
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AdminClient.deleteRecords() may cause replicas unable to fetch from beginning
> -
>
> Key: KAFKA-6975
> URL: https://issues.apache.org/jira/browse/KAFKA-6975
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Blocker
> Fix For: 2.0.0
>
>
> AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to 
> the requested offset. If the requested offset is in the middle of the batch, 
> the replica will not be able to fetch from that offset (because it is in the 
> middle of the batch). 
> One use-case where this could cause problems is replica re-assignment. 
> Suppose we have a topic partition with 3 initial replicas, and at some point 
> the user issues  AdminClient.deleteRecords() for the offset that falls in the 
> middle of the batch. It now becomes log start offset for this topic 
> partition. Suppose at some later time, the user starts partition 
> re-assignment to 3 new replicas. The new replicas (followers) will start with 
> HW = 0, will try to fetch from 0, then get "out of order offset" because 0 < 
> log start offset (LSO); the follower will be able to reset offset to LSO of 
> the leader and fetch LSO; the leader will send a batch in response with base 
> offset  stop the fetcher thread. The end result is that the new replicas will not be 
> able to start fetching unless LSO moves to an offset that is not in the 
> middle of the batch, and the re-assignment will be stuck for a possibly a 
> very log time. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2018-06-14 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510295#comment-16510295
 ] 

Eugen Feller edited comment on KAFKA-6977 at 6/14/18 5:38 PM:
--

Quick update. I have tried with 1.1.0 client and run into issue. I believe at 
the end two thing helped to make the job stable:

1) Reset offset to latest. That helped with CRC checksum issues.

2) To help with continuous rebalancing I have adjusted max.poll.interval.ms and 
other settings to:
{code:java}
requestTimeout: Duration = 40 seconds,
maxPollInterval: Duration = Duration(Integer.MAX_VALUE, TimeUnit.MILLISECONDS),
maxPollRecords: Long = 500,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 30 seconds,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 1,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L
{code}


was (Author: efeller):
Quick update. I have tried with 1.1.0 client and run into issue. I believe at 
the end two thing helped to make the job stable:

1) Reset offset to latest. That helped with CRC checksum issues.

2) Adjust max.poll.interval.ms and other settings to:
{code:java}
requestTimeout: Duration = 40 seconds,
maxPollInterval: Duration = Duration(Integer.MAX_VALUE, TimeUnit.MILLISECONDS),
maxPollRecords: Long = 500,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 30 seconds,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 1,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L
{code}

>  Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 
> while fetching data
> -
>
> Key: KAFKA-6977
> URL: https://issues.apache.org/jira/browse/KAFKA-6977
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>  Labels: streams
>
> We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and 
> constantly run into the following exception: 
> {code:java}
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> partition assignment took 40 ms.
> current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 
> 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, 
> 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31]
> current standby tasks: []
> previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 
> 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28]
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> State transition from PARTITIONS_ASSIGNED to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State 
> transition from REBALANCING to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> ERROR org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> Encountered the following error during processing:
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> 

[jira] [Comment Edited] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2018-06-14 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510295#comment-16510295
 ] 

Eugen Feller edited comment on KAFKA-6977 at 6/14/18 5:37 PM:
--

Quick update. I have tried with 1.1.0 client and run into issue. I believe at 
the end two thing helped to make the job stable:

1) Reset offset to latest. That helped with CRC checksum issues.

2) Adjust max.poll.interval.ms and other settings to:
{code:java}
requestTimeout: Duration = 40 seconds,
maxPollInterval: Duration = Duration(Integer.MAX_VALUE, TimeUnit.MILLISECONDS),
maxPollRecords: Long = 500,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 30 seconds,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 1,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L
{code}


was (Author: efeller):
Quick update. I have tried with 1.1.0 client and run into issue. I believe at 
the end two thing helped to make the job stable:

1) Reset offset to latest. That helped with CRC checksum issues.

2) Adjust max.poll.interval.ms and other settings to:
{code:java}
requestTimeout: Duration = 40 seconds,
maxPollInterval: Duration = Duration(Integer.MAX_VALUE, TimeUnit.MILLISECONDS),
maxPollRecords: Long = 500,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 30 seconds,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L
{code}

>  Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 
> while fetching data
> -
>
> Key: KAFKA-6977
> URL: https://issues.apache.org/jira/browse/KAFKA-6977
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>  Labels: streams
>
> We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and 
> constantly run into the following exception: 
> {code:java}
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> partition assignment took 40 ms.
> current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 
> 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, 
> 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31]
> current standby tasks: []
> previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 
> 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28]
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> State transition from PARTITIONS_ASSIGNED to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State 
> transition from REBALANCING to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> ERROR org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> Encountered the following error during processing:
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> 

[jira] [Updated] (KAFKA-6982) java.lang.ArithmeticException: / by zero

2018-06-14 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-6982:
-
Fix Version/s: (was: 2.0.0)
   1.1.1

java.lang.ArithmeticException was fixed in 1.1.1 via KAFKA-6893. 

> java.lang.ArithmeticException: / by zero
> 
>
> Key: KAFKA-6982
> URL: https://issues.apache.org/jira/browse/KAFKA-6982
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.1.0
> Environment: Environment: Windows 10. 
>Reporter: wade wu
>Priority: Major
> Fix For: 1.1.1
>
>
> Producer keeps sending messages to Kafka, Kafka is down. 
> Server.log shows: 
> ..
> [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
> ..
>  
> This line of code in SocketServer.scala causing the error: 
>                   {color:#33} currentProcessor = 
> currentProcessor{color:#d04437} % processors.size{color}{color}
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2018-06-14 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510295#comment-16510295
 ] 

Eugen Feller edited comment on KAFKA-6977 at 6/14/18 5:18 PM:
--

Quick update. I have tried with 1.1.0 client and run into issue. I believe at 
the end two thing helped to make the job stable:

1) Reset offset to latest. That helped with CRC checksum issues.

2) Adjust max.poll.interval.ms and other settings to:
{code:java}
requestTimeout: Duration = 40 seconds,
maxPollInterval: Duration = Duration(Integer.MAX_VALUE, TimeUnit.MILLISECONDS),
maxPollRecords: Long = 500,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 30 seconds,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L
{code}


was (Author: efeller):
Quick update. I have tried with 1.1.0 client and same issue.

>  Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 
> while fetching data
> -
>
> Key: KAFKA-6977
> URL: https://issues.apache.org/jira/browse/KAFKA-6977
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>  Labels: streams
>
> We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and 
> constantly run into the following exception: 
> {code:java}
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> partition assignment took 40 ms.
> current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 
> 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, 
> 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31]
> current standby tasks: []
> previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 
> 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28]
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> State transition from PARTITIONS_ASSIGNED to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State 
> transition from REBALANCING to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> ERROR org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> Encountered the following error during processing:
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Shutting down
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from RUNNING to PENDING_SHUTDOWN.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka 
> producer with timeoutMillis = 9223372036854775807 ms.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO 

[jira] [Updated] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2018-06-14 Thread Eugen Feller (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugen Feller updated KAFKA-6977:

Description: 
We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and constantly 
run into the following exception: 
{code:java}
[visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
partition assignment took 40 ms.
current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 
0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, 0_18, 
0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31]
current standby tasks: []
previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 
0_17, 0_20, 0_21, 0_23, 0_25, 0_28]
[visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
State transition from PARTITIONS_ASSIGNED to RUNNING.
[visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
INFO org.apache.kafka.streams.KafkaStreams - stream-client 
[visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State transition 
from REBALANCING to RUNNING.
[visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
Encountered the following error during processing:
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
java.lang.IllegalStateException: Unexpected error code 2 while fetching data
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
Shutting down
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
State transition from RUNNING to PENDING_SHUTDOWN.
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka 
producer with timeoutMillis = 9223372036854775807 ms.
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
Stream thread shutdown complete
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread 
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
State transition from PENDING_SHUTDOWN to DEAD.
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
INFO org.apache.kafka.streams.KafkaStreams - stream-client 
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4] State transition 
from RUNNING to ERROR.
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
WARN org.apache.kafka.streams.KafkaStreams - stream-client 
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4] All stream 
threads have died. The Kafka Streams instance will be in an error state and 
should be closed.
6062195 
[visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
FATAL com.zenreach.data.flows.visitstatsmongoexporter.MongoVisitStatsWriter$ - 
Exiting main on uncaught exception
java.lang.IllegalStateException: Unexpected error code 2 while fetching data
at 
org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
at 

[jira] [Commented] (KAFKA-6982) java.lang.ArithmeticException: / by zero

2018-06-14 Thread Martin M (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512758#comment-16512758
 ] 

Martin M commented on KAFKA-6982:
-

How can we get a patch for 1.1.0?

Thanks 

> java.lang.ArithmeticException: / by zero
> 
>
> Key: KAFKA-6982
> URL: https://issues.apache.org/jira/browse/KAFKA-6982
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.1.0
> Environment: Environment: Windows 10. 
>Reporter: wade wu
>Priority: Major
> Fix For: 2.0.0
>
>
> Producer keeps sending messages to Kafka, Kafka is down. 
> Server.log shows: 
> ..
> [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, 
> dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to 
> log file 
> D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to 
> Corrupt index found, index file 
> (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has 
> non-zero size but the last offset is 0 which is no greater than the base 
> offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
>  [2018-06-01 17:01:34,664] ERROR Error while accepting connection 
> (kafka.network.Acceptor)
>  java.lang.ArithmeticException: / by zero
>  at kafka.network.Acceptor.run(SocketServer.scala:354)
>  at java.lang.Thread.run(Thread.java:748)
> ..
>  
> This line of code in SocketServer.scala causing the error: 
>                   {color:#33} currentProcessor = 
> currentProcessor{color:#d04437} % processors.size{color}{color}
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware

2018-06-14 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512722#comment-16512722
 ] 

Richard Yu commented on KAFKA-4696:
---

Hi, I will probably restart my work on this.

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Damian Guy
>Assignee: Richard Yu
>Priority: Major
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning

2018-06-14 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6975.

   Resolution: Fixed
Fix Version/s: (was: 1.1.1)
   (was: 1.0.2)

> AdminClient.deleteRecords() may cause replicas unable to fetch from beginning
> -
>
> Key: KAFKA-6975
> URL: https://issues.apache.org/jira/browse/KAFKA-6975
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Blocker
> Fix For: 2.0.0
>
>
> AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to 
> the requested offset. If the requested offset is in the middle of the batch, 
> the replica will not be able to fetch from that offset (because it is in the 
> middle of the batch). 
> One use-case where this could cause problems is replica re-assignment. 
> Suppose we have a topic partition with 3 initial replicas, and at some point 
> the user issues  AdminClient.deleteRecords() for the offset that falls in the 
> middle of the batch. It now becomes log start offset for this topic 
> partition. Suppose at some later time, the user starts partition 
> re-assignment to 3 new replicas. The new replicas (followers) will start with 
> HW = 0, will try to fetch from 0, then get "out of order offset" because 0 < 
> log start offset (LSO); the follower will be able to reset offset to LSO of 
> the leader and fetch LSO; the leader will send a batch in response with base 
> offset  stop the fetcher thread. The end result is that the new replicas will not be 
> able to start fetching unless LSO moves to an offset that is not in the 
> middle of the batch, and the re-assignment will be stuck for a possibly a 
> very log time. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required

2018-06-14 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512623#comment-16512623
 ] 

Rajini Sivaram commented on KAFKA-6806:
---

Thanks [~rhauch]

> Unable to validate sink connectors without "topics" component which is not 
> required
> ---
>
> Key: KAFKA-6806
> URL: https://issues.apache.org/jira/browse/KAFKA-6806
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: CP4.1, Centos7
>Reporter: Ivan Majnarić
>Priority: Major
>
> The bug is happening when you try to create new connector through for example 
> kafka-connect-ui.
> While both source and sink connectors were able to be validated through REST 
> without "topics" as add-on with "connector.class" like this:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> }{code}
> In the new version of CP4.1 you still can validate *source connectors* but 
> not *sink connectors*. If you want to validate sink connectors you need to 
> add to request -> "topics" config, like:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> "topics": "test-topic"
> }{code}
> So there is a little missmatch of the ways how to validate connectors which I 
> think happened accidentally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512618#comment-16512618
 ] 

Gunnar Morling commented on KAFKA-7058:
---

Thanks for the quick review, [~rhauch]. It's definitely not a blocker for us, 
we worked around it by having our own equals() implementation for schemas for 
now.

I'm curious about the Kafka versions maintained going forward, though. Will it 
be 0.10.x and 2.0, but not 1.x?

> ConnectSchema#equals() broken for array-typed default values
> 
>
> Key: KAFKA-7058
> URL: https://issues.apache.org/jira/browse/KAFKA-7058
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Gunnar Morling
>Priority: Major
>
> {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' 
> default values, but this doesn't work correctly if the default values in fact 
> are arrays. In this case, always {{false}} will be returned, also if the 
> default value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task

2018-06-14 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512615#comment-16512615
 ] 

Richard Yu commented on KAFKA-6583:
---

[~mjsax] Did you finish the upgrade path for metadata? If so, we probably could 
start restart this PR.

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6583) Metadata should include number of state stores for task

2018-06-14 Thread Richard Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512615#comment-16512615
 ] 

Richard Yu edited comment on KAFKA-6583 at 6/14/18 3:31 PM:


[~mjsax] Did you finish the upgrade path for metadata? If so, we probably could 
restart this PR.


was (Author: yohan123):
[~mjsax] Did you finish the upgrade path for metadata? If so, we probably could 
start restart this PR.

> Metadata should include number of state stores for task
> ---
>
> Key: KAFKA-6583
> URL: https://issues.apache.org/jira/browse/KAFKA-6583
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Richard Yu
>Priority: Critical
>  Labels: needs-kip
>
> Currently, in the need for clients to be more evenly balanced, stateful tasks 
> should be distributed in such a manner that it will be spread equally. 
> However, for such an awareness to be implemented during task assignment, it 
> would require the need for the present rebalance protocol metadata to also 
> contain the number of state stores in a particular task. This way, it will 
> allow us to "weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512609#comment-16512609
 ] 

ASF GitHub Bot commented on KAFKA-6975:
---

hachikuji closed pull request #5133: KAFKA-6975: Fix fetching from 
non-batch-aligned log start offset
URL: https://github.com/apache/kafka/pull/5133
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index b9180a45378..55f870e96f7 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 import com.yammer.metrics.core.Gauge
 import kafka.api.LeaderAndIsr
 import kafka.api.Request
+import kafka.common.UnexpectedAppendOffsetException
 import kafka.controller.KafkaController
 import kafka.log.{LogAppendInfo, LogConfig}
 import kafka.metrics.KafkaMetricsGroup
@@ -30,7 +31,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import kafka.zk.AdminZkClient
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{NotEnoughReplicasException, 
NotLeaderForPartitionException, PolicyViolationException}
+import org.apache.kafka.common.errors.{ReplicaNotAvailableException, 
NotEnoughReplicasException, NotLeaderForPartitionException, 
PolicyViolationException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.record.MemoryRecords
@@ -187,6 +188,10 @@ class Partition(val topic: String,
 
   def getReplica(replicaId: Int = localBrokerId): Option[Replica] = 
Option(allReplicasMap.get(replicaId))
 
+  def getReplicaOrException(replicaId: Int = localBrokerId): Replica =
+getReplica(replicaId).getOrElse(
+  throw new ReplicaNotAvailableException(s"Replica $replicaId is not 
available for partition $topicPartition"))
+
   def leaderReplicaIfLocal: Option[Replica] =
 leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
 
@@ -545,15 +550,41 @@ class Partition(val topic: String,
 laggingReplicas
   }
 
-  def appendRecordsToFutureReplica(records: MemoryRecords) {
-
getReplica(Request.FutureLocalReplicaId).get.log.get.appendAsFollower(records)
+  private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, 
isFuture: Boolean): Unit = {
+  if (isFuture)
+
getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records)
+  else {
+// The read lock is needed to prevent the follower replica from being 
updated while ReplicaAlterDirThread
+// is executing maybeDeleteAndSwapFutureReplica() to replace follower 
replica with the future replica.
+inReadLock(leaderIsrUpdateLock) {
+   getReplicaOrException().log.get.appendAsFollower(records)
+}
+  }
   }
 
-  def appendRecordsToFollower(records: MemoryRecords) {
-// The read lock is needed to prevent the follower replica from being 
updated while ReplicaAlterDirThread
-// is executing maybeDeleteAndSwapFutureReplica() to replace follower 
replica with the future replica.
-inReadLock(leaderIsrUpdateLock) {
-  getReplica().get.log.get.appendAsFollower(records)
+  def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: 
Boolean) {
+try {
+  doAppendRecordsToFollowerOrFutureReplica(records, isFuture)
+} catch {
+  case e: UnexpectedAppendOffsetException =>
+val replica = if (isFuture) 
getReplicaOrException(Request.FutureLocalReplicaId) else getReplicaOrException()
+val logEndOffset = replica.logEndOffset.messageOffset
+if (logEndOffset == replica.logStartOffset &&
+e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) {
+  // This may happen if the log start offset on the leader (or current 
replica) falls in
+  // the middle of the batch due to delete records request and the 
follower tries to
+  // fetch its first offset from the leader.
+  // We handle this case here instead of Log#append() because we will 
need to remove the
+  // segment that start with log start offset and create a new one 
with earlier offset
+  // (base offset of the batch), which will move recoveryPoint 
backwards, so we will need
+  // to checkpoint the new recovery point before we append
+  val replicaName = if (isFuture) "future replica" else "follower"
+  info(s"Unexpected offset in append to $topicPartition. First offset 
${e.firstOffset} is less than log start offset 

[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Randall Hauch (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512606#comment-16512606
 ] 

Randall Hauch commented on KAFKA-7058:
--

[~gunnar.morling] thanks for the PR. It looks good, but we're a few days past 
code freeze and even though this is a small change I'm not sure we can call 
this a real blocker. I'm going to approve it, but we'll probably have to wait 
until after the 2.0 release to merge. The good news is that this should 
backport pretty cleanly back to the 0.10 branch, and so it would be in the next 
patch release on the various branches whenever they occur.

> ConnectSchema#equals() broken for array-typed default values
> 
>
> Key: KAFKA-7058
> URL: https://issues.apache.org/jira/browse/KAFKA-7058
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Gunnar Morling
>Priority: Major
>
> {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' 
> default values, but this doesn't work correctly if the default values in fact 
> are arrays. In this case, always {{false}} will be returned, also if the 
> default value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-7058:
-
Affects Version/s: 1.0.0
   1.1.0

> ConnectSchema#equals() broken for array-typed default values
> 
>
> Key: KAFKA-7058
> URL: https://issues.apache.org/jira/browse/KAFKA-7058
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Gunnar Morling
>Priority: Major
>
> {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' 
> default values, but this doesn't work correctly if the default values in fact 
> are arrays. In this case, always {{false}} will be returned, also if the 
> default value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required

2018-06-14 Thread Randall Hauch (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512586#comment-16512586
 ] 

Randall Hauch edited comment on KAFKA-6806 at 6/14/18 3:10 PM:
---

[~rsivaram], this is strictly speaking not a blocker, and since we're past code 
freeze we should push it. The fix is not that trivial, either, so it would be 
unnecessary risk for the release.

[~iMajna], I have an idea that would make the fix quite straightforward, but it 
will require a very simple KIP to add a default method to the Validator that 
takes the whole configuration and which by default delegates to the current 
method. ConfigDef could then call this new method, but since this would be 
backward compatible no existing implementations would need to change. However, 
in cases like this we could implement the new method and get the behavior we 
want.

I'll start working on the KIP, but it will have to wait until 2.1. 
Unfortunately we just missed the deadline by a few days to fix this the hard 
way. :-(


was (Author: rhauch):
[~rsivaram], this is strictly speaking not a blocker, and since we're past code 
freeze we should push it. The fix is not that trivial, either, so it would be 
unnecessary risk for the release.

[~iMajna], I have an idea that would make the fix quite straightforward, but it 
will require a very simple KIP to add a default method to the Validator that 
takes the whole configuration and which by default delegates to the current 
method. ConfigDef could then call this new method, but since this would be 
backward compatible no existing implementations would need to change. However, 
in cases like this we could implement the new method and get the behavior we 
want.

> Unable to validate sink connectors without "topics" component which is not 
> required
> ---
>
> Key: KAFKA-6806
> URL: https://issues.apache.org/jira/browse/KAFKA-6806
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: CP4.1, Centos7
>Reporter: Ivan Majnarić
>Priority: Major
>
> The bug is happening when you try to create new connector through for example 
> kafka-connect-ui.
> While both source and sink connectors were able to be validated through REST 
> without "topics" as add-on with "connector.class" like this:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> }{code}
> In the new version of CP4.1 you still can validate *source connectors* but 
> not *sink connectors*. If you want to validate sink connectors you need to 
> add to request -> "topics" config, like:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> "topics": "test-topic"
> }{code}
> So there is a little missmatch of the ways how to validate connectors which I 
> think happened accidentally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required

2018-06-14 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6806:
-
 Priority: Major  (was: Blocker)
Fix Version/s: (was: 1.1.1)
   (was: 2.0.0)

[~rsivaram], this is strictly speaking not a blocker, and since we're past code 
freeze we should push it. The fix is not that trivial, either, so it would be 
unnecessary risk for the release.

[~iMajna], I have an idea that would make the fix quite straightforward, but it 
will require a very simple KIP to add a default method to the Validator that 
takes the whole configuration and which by default delegates to the current 
method. ConfigDef could then call this new method, but since this would be 
backward compatible no existing implementations would need to change. However, 
in cases like this we could implement the new method and get the behavior we 
want.

> Unable to validate sink connectors without "topics" component which is not 
> required
> ---
>
> Key: KAFKA-6806
> URL: https://issues.apache.org/jira/browse/KAFKA-6806
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: CP4.1, Centos7
>Reporter: Ivan Majnarić
>Priority: Major
>
> The bug is happening when you try to create new connector through for example 
> kafka-connect-ui.
> While both source and sink connectors were able to be validated through REST 
> without "topics" as add-on with "connector.class" like this:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> }{code}
> In the new version of CP4.1 you still can validate *source connectors* but 
> not *sink connectors*. If you want to validate sink connectors you need to 
> add to request -> "topics" config, like:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> "topics": "test-topic"
> }{code}
> So there is a little missmatch of the ways how to validate connectors which I 
> think happened accidentally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7059) Offer new constructor on ProducerRecord

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512572#comment-16512572
 ] 

ASF GitHub Bot commented on KAFKA-7059:
---

matzew opened a new pull request #5228: KAFKA-7059 Offer new constructor on 
ProducerRecord
URL: https://github.com/apache/kafka/pull/5228
 
 
   Signed-off-by: Matthias Wessendorf 
   
   Done for https://issues.apache.org/jira/browse/KAFKA-7059 
   
   When developers are creating a ProducerRecord, with custom headers, it 
currently requires the usage of a constructor with a slightly longer arguments 
list.
   
   This is OK, but it would be handy or more convenient if there was a ctor, 
like:
   
   ```java
   public ProducerRecord(String topic, K key, V value, Iterable headers)
   ```
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Offer new constructor on ProducerRecord 
> 
>
> Key: KAFKA-7059
> URL: https://issues.apache.org/jira/browse/KAFKA-7059
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Matthias Weßendorf
>Priority: Trivial
> Fix For: 2.0.1
>
>
> creating a ProducerRecord, with custom headers requires usage of a 
> constructor with a slightly longer arguments list.
>  
> It would be handy or more convenient if there was a ctor, like:
> {code}
> public ProducerRecord(String topic, K key, V value, Iterable headers)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7059) Offer new constructor on ProducerRecord

2018-06-14 Thread JIRA
Matthias Weßendorf created KAFKA-7059:
-

 Summary: Offer new constructor on ProducerRecord 
 Key: KAFKA-7059
 URL: https://issues.apache.org/jira/browse/KAFKA-7059
 Project: Kafka
  Issue Type: Improvement
Reporter: Matthias Weßendorf
 Fix For: 2.0.1


creating a ProducerRecord, with custom headers requires usage of a constructor 
with a slightly longer arguments list.

 

It would be handy or more convenient if there was a ctor, like:

{code}
public ProducerRecord(String topic, K key, V value, Iterable headers)
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned

2018-06-14 Thread Magesh kumar Nandakumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Magesh kumar Nandakumar updated KAFKA-7039:
---
Priority: Blocker  (was: Critical)

> DelegatingClassLoader creates plugin instance even if its not Versioned
> ---
>
> Key: KAFKA-7039
> URL: https://issues.apache.org/jira/browse/KAFKA-7039
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Blocker
> Fix For: 2.0.0
>
>
> The versioned interface was introduced as part of 
> [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin].
>  DelegatingClassLoader is now attempting to create an instance of all the 
> plugins, even if it's not required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned

2018-06-14 Thread Magesh kumar Nandakumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Magesh kumar Nandakumar updated KAFKA-7039:
---
Fix Version/s: (was: 2.1.0)
   2.0.0

> DelegatingClassLoader creates plugin instance even if its not Versioned
> ---
>
> Key: KAFKA-7039
> URL: https://issues.apache.org/jira/browse/KAFKA-7039
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Blocker
> Fix For: 2.0.0
>
>
> The versioned interface was introduced as part of 
> [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin].
>  DelegatingClassLoader is now attempting to create an instance of all the 
> plugins, even if it's not required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned

2018-06-14 Thread Magesh kumar Nandakumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512493#comment-16512493
 ] 

Magesh kumar Nandakumar commented on KAFKA-7039:


[~rsivaram] I think this should be marked Blocker for 2.0 since without this 
some of the connectors could be broken depending on how the plugin path is 
configured.

> DelegatingClassLoader creates plugin instance even if its not Versioned
> ---
>
> Key: KAFKA-7039
> URL: https://issues.apache.org/jira/browse/KAFKA-7039
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Critical
> Fix For: 2.0.0
>
>
> The versioned interface was introduced as part of 
> [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin].
>  DelegatingClassLoader is now attempting to create an instance of all the 
> plugins, even if it's not required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-6711.
---
   Resolution: Fixed
 Reviewer: Matthias J. Sax
Fix Version/s: 2.0.0

> GlobalStateManagerImpl should not write offsets of in-memory stores in 
> checkpoint file
> --
>
> Key: KAFKA-6711
> URL: https://issues.apache.org/jira/browse/KAFKA-6711
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Assignee: Cemalettin Koç
>Priority: Major
>  Labels: newbie
> Fix For: 2.0.0
>
>
> We are using an InMemoryStore along with GlobalKTable and I noticed that 
> after each restart I am losing all my data. When I debug it, 
> `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains 
> offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl 
> implementation and noticed that it is not guarded for cases similar to mine. 
> I am fairly new to Kafka land and probably there might be another way to fix 
> issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512359#comment-16512359
 ] 

ASF GitHub Bot commented on KAFKA-6711:
---

rajinisivaram closed pull request #5219: KAFKA-6711: GlobalStateManagerImpl 
should not write offsets of in-memory stores in checkpoint file
URL: https://github.com/apache/kafka/pull/5219
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 78c4a363f29..a4ec23d4c49 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -42,6 +42,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -62,6 +63,7 @@
 private final int retries;
 private final long retryBackoffMs;
 private final Duration pollTime;
+private final Set globalNonPersistentStoresTopics = new 
HashSet<>();
 
 public GlobalStateManagerImpl(final LogContext logContext,
   final ProcessorTopology topology,
@@ -71,6 +73,14 @@ public GlobalStateManagerImpl(final LogContext logContext,
   final StreamsConfig config) {
 super(stateDirectory.globalStateDir(), 
StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
 
+// Find non persistent store's topics
+final Map storeToChangelogTopic = 
topology.storeToChangelogTopic();
+for (final StateStore store : topology.globalStateStores()) {
+if (!store.persistent()) {
+
globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+}
+}
+
 this.log = logContext.logger(GlobalStateManagerImpl.class);
 this.topology = topology;
 this.globalConsumer = globalConsumer;
@@ -337,13 +347,22 @@ public void close(final Map 
offsets) throws IOException {
 @Override
 public void checkpoint(final Map offsets) {
 checkpointableOffsets.putAll(offsets);
-if (!checkpointableOffsets.isEmpty()) {
-try {
-checkpoint.write(checkpointableOffsets);
-} catch (final IOException e) {
-log.warn("Failed to write offset checkpoint file to {} for 
global stores: {}", checkpoint, e);
+
+final Map filteredOffsets = new HashMap<>();
+
+// Skip non persistent store
+for (final Map.Entry topicPartitionOffset : 
checkpointableOffsets.entrySet()) {
+final String topic = topicPartitionOffset.getKey().topic();
+if (!globalNonPersistentStoresTopics.contains(topic)) {
+filteredOffsets.put(topicPartitionOffset.getKey(), 
topicPartitionOffset.getValue());
 }
 }
+
+try {
+checkpoint.write(filteredOffsets);
+} catch (final IOException e) {
+log.warn("Failed to write offset checkpoint file to {} for global 
stores: {}", checkpoint, e);
+}
 }
 
 @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 900e65276ee..013e2b68110 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -22,13 +22,13 @@
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
@@ -38,6 +38,7 @@
 import org.apache.kafka.streams.state.KeyValueStore;
 import 

[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512270#comment-16512270
 ] 

ASF GitHub Bot commented on KAFKA-7058:
---

gunnarmorling opened a new pull request #5225: KAFKA-7058 Comparing schema 
default values using Objects#deepEquals()
URL: https://github.com/apache/kafka/pull/5225
 
 
   https://issues.apache.org/jira/browse/KAFKA-7058
   * Summary of testing strategy: Added new unit test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConnectSchema#equals() broken for array-typed default values
> 
>
> Key: KAFKA-7058
> URL: https://issues.apache.org/jira/browse/KAFKA-7058
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' 
> default values, but this doesn't work correctly if the default values in fact 
> are arrays. In this case, always {{false}} will be returned, also if the 
> default value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7015) Enhance RecordCollectorImpl exceptions with more context information

2018-06-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512266#comment-16512266
 ] 

ASF GitHub Bot commented on KAFKA-7015:
---

jadireddi opened a new pull request #5224: KAFKA-7015: Fixed 
RecordCollectorImpl exception messages with more human readable context info.
URL: https://github.com/apache/kafka/pull/5224
 
 
   https://issues.apache.org/jira/browse/KAFKA-7015
   Fixed `RecordCollectorImpl` class to enhance exceptions messages  to human 
readable format for Key/Value pair.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enhance RecordCollectorImpl exceptions with more context information  
> -
>
> Key: KAFKA-7015
> URL: https://issues.apache.org/jira/browse/KAFKA-7015
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Jagadesh Adireddi
>Priority: Minor
>
> In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
> only have concrete key/value types on outer layers/wrappers of the stores.
> For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
> messages anymore if a put/get/delete operation fails as it only handles plain 
> bytes.
> In addition, the corresponding calls to record changelog records to record 
> collectors will also be sending byte arrays only, and hence when there is an 
> error happening, the record collector can only display the key but not the 
> value since it is all bytes:
> {code:java}
> [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl   -
> task [2_2] Error sending record (key {"eventId":XXX,"version":123}
> value [] timestamp YYY) to topic TTT
> due to ...
> {code}
> The store exceptions got fixed via KAFKA-6538.
> This Jira is to track the fix for RecordCollectorImpl.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Gunnar Morling (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-7058:
--
Component/s: KafkaConnect

> ConnectSchema#equals() broken for array-typed default values
> 
>
> Key: KAFKA-7058
> URL: https://issues.apache.org/jira/browse/KAFKA-7058
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> {ConnectSchema#equals()} calls {{Objects#equals()}} for the schemas' default 
> values, but this doesn't work correctly if the default values in fact are 
> arrays. In this case, always {false} will be returned, also if the default 
> value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Gunnar Morling (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gunnar Morling updated KAFKA-7058:
--
Description: {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the 
schemas' default values, but this doesn't work correctly if the default values 
in fact are arrays. In this case, always {{false}} will be returned, also if 
the default value arrays actually are the same.  (was: {ConnectSchema#equals()} 
calls {{Objects#equals()}} for the schemas' default values, but this doesn't 
work correctly if the default values in fact are arrays. In this case, always 
{false} will be returned, also if the default value arrays actually are the 
same.)

> ConnectSchema#equals() broken for array-typed default values
> 
>
> Key: KAFKA-7058
> URL: https://issues.apache.org/jira/browse/KAFKA-7058
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' 
> default values, but this doesn't work correctly if the default values in fact 
> are arrays. In this case, always {{false}} will be returned, also if the 
> default value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Gunnar Morling (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512261#comment-16512261
 ] 

Gunnar Morling commented on KAFKA-7058:
---

I'll send a pull request for this in a bit.

> ConnectSchema#equals() broken for array-typed default values
> 
>
> Key: KAFKA-7058
> URL: https://issues.apache.org/jira/browse/KAFKA-7058
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gunnar Morling
>Priority: Major
>
> {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' 
> default values, but this doesn't work correctly if the default values in fact 
> are arrays. In this case, always {{false}} will be returned, also if the 
> default value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values

2018-06-14 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-7058:
-

 Summary: ConnectSchema#equals() broken for array-typed default 
values
 Key: KAFKA-7058
 URL: https://issues.apache.org/jira/browse/KAFKA-7058
 Project: Kafka
  Issue Type: Bug
Reporter: Gunnar Morling


{ConnectSchema#equals()} calls {{Objects#equals()}} for the schemas' default 
values, but this doesn't work correctly if the default values in fact are 
arrays. In this case, always {false} will be returned, also if the default 
value arrays actually are the same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7057) Consumer stop polling

2018-06-14 Thread Moshe Lavi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Moshe Lavi updated KAFKA-7057:
--
Description: 
We build 3 Kafka brokers (0.10.1.1) version using Spring Cloud Stream consumer 
to poll messages.
 We encountered consumer lags alerted and found some consumers were blocked and 
not polling anymore messages. This requires us to restart the microservice 
where that consumer resides.

I wonder if this has to do with lack of available threads or to the fact the 
heartbeat daemon does not exist/work.

*The thread dump shows:*

kafka-coordinator-heartbeat-thread | SiteAgreementItem" #4943 daemon prio=5 
os_prio=0 tid=0x7f3abdd08000 nid=0x83ac waiting for monitor entry 
[0x7f3a5dcdb000]

   java.lang.Thread.State: BLOCKED (on object monitor)

    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disableWakeups(ConsumerNetworkClient.java:409)

    - waiting to lock <*0x0005df800450*> (a 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)

    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:264)

    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:865)

    - locked <0x0005df800488> (a 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

 

-kafka-consumer-1" #4940 prio=5 os_prio=0 tid=0x7f3a8d433800 nid=0x838e 
runnable [0x7f3a5dedd000]

   java.lang.Thread.State: RUNNABLE

    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)

    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)

    at 
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)

    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

    - locked <0x0005df7705e0> (a sun.nio.ch.Util$2)

    - locked <0x0005df7705d0> (a 
java.util.Collections$UnmodifiableSet)

    - locked <0x0005df7705f0> (a sun.nio.ch.EPollSelectorImpl)

    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)

    at 
org.apache.kafka.common.network.Selector.select(Selector.java:470)

    at 
org.apache.kafka.common.network.Selector.poll(Selector.java:286)

    at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)

    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)

    - locked <*0x0005df800450*> (a 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)

    at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1031)

    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)

    at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:532)

    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at java.lang.Thread.run(Thread.java:745)

  was:
We build 3 Kafka brokers (0.10.1.1) version using Spring Cloud Stream consumer 
to poll messages.
We encountered consumer lags alerted and found some consumers were blocked and 
not polling anymore messages. This requires us to restart the microservice 
where that consumer resides.

I wonder if this has to do with lack of available threads or to the fact there 
heartbeat daemon does not exist/work.


*The thread dump shows:*

kafka-coordinator-heartbeat-thread | SiteAgreementItem" #4943 daemon prio=5 
os_prio=0 tid=0x7f3abdd08000 nid=0x83ac waiting for monitor entry 
[0x7f3a5dcdb000]

   java.lang.Thread.State: BLOCKED (on object monitor)

    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disableWakeups(ConsumerNetworkClient.java:409)

    - waiting to lock <*0x0005df800450*> (a 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)

    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:264)

    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:865)

    - locked <0x0005df800488> (a 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

 

-kafka-consumer-1" #4940 prio=5 os_prio=0 tid=0x7f3a8d433800 nid=0x838e 
runnable [0x7f3a5dedd000]

   java.lang.Thread.State: RUNNABLE

    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)

    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)

  

[jira] [Created] (KAFKA-7057) Consumer stop polling

2018-06-14 Thread Moshe Lavi (JIRA)
Moshe Lavi created KAFKA-7057:
-

 Summary: Consumer stop polling
 Key: KAFKA-7057
 URL: https://issues.apache.org/jira/browse/KAFKA-7057
 Project: Kafka
  Issue Type: Bug
  Components: consumer, controller
Affects Versions: 0.10.1.1
Reporter: Moshe Lavi


We build 3 Kafka brokers (0.10.1.1) version using Spring Cloud Stream consumer 
to poll messages.
We encountered consumer lags alerted and found some consumers were blocked and 
not polling anymore messages. This requires us to restart the microservice 
where that consumer resides.

I wonder if this has to do with lack of available threads or to the fact there 
heartbeat daemon does not exist/work.


*The thread dump shows:*

kafka-coordinator-heartbeat-thread | SiteAgreementItem" #4943 daemon prio=5 
os_prio=0 tid=0x7f3abdd08000 nid=0x83ac waiting for monitor entry 
[0x7f3a5dcdb000]

   java.lang.Thread.State: BLOCKED (on object monitor)

    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disableWakeups(ConsumerNetworkClient.java:409)

    - waiting to lock <*0x0005df800450*> (a 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)

    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:264)

    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:865)

    - locked <0x0005df800488> (a 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

 

-kafka-consumer-1" #4940 prio=5 os_prio=0 tid=0x7f3a8d433800 nid=0x838e 
runnable [0x7f3a5dedd000]

   java.lang.Thread.State: RUNNABLE

    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)

    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)

    at 
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)

    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

    - locked <0x0005df7705e0> (a sun.nio.ch.Util$2)

    - locked <0x0005df7705d0> (a 
java.util.Collections$UnmodifiableSet)

    - locked <0x0005df7705f0> (a sun.nio.ch.EPollSelectorImpl)

    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)

    at 
org.apache.kafka.common.network.Selector.select(Selector.java:470)

    at 
org.apache.kafka.common.network.Selector.poll(Selector.java:286)

    at 
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)

    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)

    - locked <*0x0005df800450*> (a 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)

    at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1031)

    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)

    at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:532)

    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4812) We are facing the same issue as SAMZA-590 for kafka

2018-06-14 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4812.
--
Resolution: Auto Closed

Closing inactive issue. The old consumer is no longer supported. Please upgrade 
to the Java consumer whenever possible.

> We are facing the same issue as SAMZA-590 for kafka
> ---
>
> Key: KAFKA-4812
> URL: https://issues.apache.org/jira/browse/KAFKA-4812
> Project: Kafka
>  Issue Type: Bug
>Reporter: Manjeer Srujan. Y
>Priority: Critical
>
> Dead Kafka broker ignores new leader.
> We are facing the same issue as samza issue below. But, we couldn't find any 
> fix for this in kafka. Pasted the log below for reference.
> The kafka client that we are using is below.
> group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.8.2.1'
> https://issues.apache.org/jira/browse/SAMZA-590
> 2017-02-28 09:50:53.189 29708 [Thread-11-vendor-index-spout-executor[35 35]] 
> ERROR org.apache.storm.daemon.executor -  - java.lang.RuntimeException: 
> java.nio.channels.ClosedChannelException
> at 
> org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103)
> at 
> org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69)
> at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129)
> at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)
> at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
> at clojure.lang.AFn.run(AFn.java:22)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at 
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at 
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127)
> at 
> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
> at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75)
> at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65)
> at 
> org.apache.storm.kafka.PartitionManager.(PartitionManager.java:94)
> at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98)
> ... 6 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required

2018-06-14 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Majnarić updated KAFKA-6806:
-
Environment: CP4.1, Centos7  (was: CP4.1., Centos7)

> Unable to validate sink connectors without "topics" component which is not 
> required
> ---
>
> Key: KAFKA-6806
> URL: https://issues.apache.org/jira/browse/KAFKA-6806
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: CP4.1, Centos7
>Reporter: Ivan Majnarić
>Priority: Blocker
> Fix For: 2.0.0, 1.1.1
>
>
> The bug is happening when you try to create new connector through for example 
> kafka-connect-ui.
> While both source and sink connectors were able to be validated through REST 
> without "topics" as add-on with "connector.class" like this:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> }{code}
> In the new version of CP4.1 you still can validate *source connectors* but 
> not *sink connectors*. If you want to validate sink connectors you need to 
> add to request -> "topics" config, like:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> "topics": "test-topic"
> }{code}
> So there is a little missmatch of the ways how to validate connectors which I 
> think happened accidentally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required

2018-06-14 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512145#comment-16512145
 ] 

Ivan Majnarić commented on KAFKA-6806:
--

Hi [~rsivaram] and [~lindong]
[~rhauch] set this as a Blocker so I suppose this should be treated like it is

> Unable to validate sink connectors without "topics" component which is not 
> required
> ---
>
> Key: KAFKA-6806
> URL: https://issues.apache.org/jira/browse/KAFKA-6806
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: CP4.1., Centos7
>Reporter: Ivan Majnarić
>Priority: Blocker
> Fix For: 2.0.0, 1.1.1
>
>
> The bug is happening when you try to create new connector through for example 
> kafka-connect-ui.
> While both source and sink connectors were able to be validated through REST 
> without "topics" as add-on with "connector.class" like this:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> }{code}
> In the new version of CP4.1 you still can validate *source connectors* but 
> not *sink connectors*. If you want to validate sink connectors you need to 
> add to request -> "topics" config, like:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> "topics": "test-topic"
> }{code}
> So there is a little missmatch of the ways how to validate connectors which I 
> think happened accidentally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7039:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

[~mageshn] Moving this out to 2.1.0 since it hasn't been merged yet. Feel free 
to include in 2.0.0 and update the merge version if completed in time,

> DelegatingClassLoader creates plugin instance even if its not Versioned
> ---
>
> Key: KAFKA-7039
> URL: https://issues.apache.org/jira/browse/KAFKA-7039
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Critical
> Fix For: 2.1.0
>
>
> The versioned interface was introduced as part of 
> [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin].
>  DelegatingClassLoader is now attempting to create an instance of all the 
> plugins, even if it's not required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7028) super.users doesn't work with custom principals

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7028:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since there is no PR yet.

> super.users doesn't work with custom principals
> ---
>
> Key: KAFKA-7028
> URL: https://issues.apache.org/jira/browse/KAFKA-7028
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Priority: Major
> Fix For: 2.1.0
>
>
> SimpleAclAuthorizer creates a KafkaPrincipal for the users defined in the 
> super.users broker config. However, it should use the configured 
> KafkaPrincipalBuilder so that it works with a custom defined one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7045:
--
Fix Version/s: (was: 2.0.0)
   2.0.1

[~dhruvilshah] Moving this out to 2.0.1 since it is ready yet. If you do get a 
fix ready in time, please include in 2.0.0 and update the fix version

> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
> Fix For: 2.0.1
>
> Attachments: log-cleaner-test.zip
>
>
> When down-conversion is required, the consumer might fail consuming messages 
> under certain conditions. Couple such cases are outlined below:
> (1) When consuming from a compacted topic, it is possible that the consumer 
> wants to fetch messages that fall in the middle of a batch but the messages 
> have been compacted by the cleaner. For example, let's say we have the 
> following two segments. The brackets indicate a single batch of messages and 
> the numbers within are the message offsets.
> Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> If the cleaner were to come in now and clean up messages with offsets 7 and 
> 8, the segments would look like the following:
> Segment #1: [0, 1, 2], [3, 4, 5], [6]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> A consumer attempting to fetch messages at offset 7 will start reading the 
> batch starting at offset 6. During down-conversion, we will drop the record 
> starting at 6 it is less than the current fetch start offset. However, there 
> are no messages in the log following offset 6. In such cases, we return the 
> `FileRecords` itself which would cause the consumer to throw an exception 
> because it does not understand the stored message format.
> (2) When consuming from a topic with transactional messages, down-conversion 
> usually drops control batches because these do not exist in V0 and V1 message 
> formats. If there are no message batches following the control batch in the 
> particular segment (or if we are at the end of the log), we would again get 
> no records after down-conversion and will return the `FileRecords`. Because 
> the consumer is not able to interpret control batches, it will again throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6805) Allow dynamic broker configs to be configured in ZooKeeper before starting broker

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6805:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since it is not ready yet.

> Allow dynamic broker configs to be configured in ZooKeeper before starting 
> broker
> -
>
> Key: KAFKA-6805
> URL: https://issues.apache.org/jira/browse/KAFKA-6805
> Project: Kafka
>  Issue Type: Task
>  Components: tools
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.1.0
>
>
> At the moment, dynamic broker configs like SSL keystore and password can be 
> configured using ConfigCommand only after a broker is started (using the new 
> AdminClient). To start a broker, these configs have to be defined in 
> server.properties. We want to restrict updates using ZooKeeper once broker 
> starts up, but we should allow updates using ZK prior to starting brokers. 
> This is particularly useful for password configs which are stored encrypted 
> in ZK, making it difficult to set manually before starting brokers.
> ConfigCommand is being updated to talk to AdminClient under KIP-248, but we 
> will need to maintain the tool using ZK to enable credentials to be created 
> in ZK before starting brokers. So the functionality to set broker configs can 
> sit alongside that.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6697) JBOD configured broker should not die if log directory is invalid

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6697:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

[~lindong] Moving this to 2.1.0 since it is not ready yet. Feel free to include 
in 2.0.0 and update the fix version if completed in time.

> JBOD configured broker should not die if log directory is invalid
> -
>
> Key: KAFKA-6697
> URL: https://issues.apache.org/jira/browse/KAFKA-6697
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently JBOD configured broker will still die on startup if 
> dir.getCanonicalPath() throws IOException. We should mark such log directory 
> as offline and broker should still run if there is good disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6684) Support casting values with bytes schema to string

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6684:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since it is not ready yet. Feel free to include in 
2.0.0 and change the fix version if ready to merge in time.

> Support casting values with bytes schema to string 
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Critical
> Fix For: 2.1.0
>
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is 
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's 
> {{toString()}}, such that if the object is actually a LogicalType it can be 
> "serialized" as string instead of bytes+schema.
>  
> {noformat}
> Examples:
> BigDecimal will cast to the string representation of the number.
> Timestamp will cast to the string representation of the timestamp, or maybe 
> UTC mmddTHH:MM:SS.f format?
> {noformat}
>  
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its 
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required

2018-06-14 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512110#comment-16512110
 ] 

Rajini Sivaram commented on KAFKA-6806:
---

Hi [~rhauch] [~iMajna] Same question as above for 2.0.0. Is this really a 
blocker?

> Unable to validate sink connectors without "topics" component which is not 
> required
> ---
>
> Key: KAFKA-6806
> URL: https://issues.apache.org/jira/browse/KAFKA-6806
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.1.0
> Environment: CP4.1., Centos7
>Reporter: Ivan Majnarić
>Priority: Blocker
> Fix For: 2.0.0, 1.1.1
>
>
> The bug is happening when you try to create new connector through for example 
> kafka-connect-ui.
> While both source and sink connectors were able to be validated through REST 
> without "topics" as add-on with "connector.class" like this:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> }{code}
> In the new version of CP4.1 you still can validate *source connectors* but 
> not *sink connectors*. If you want to validate sink connectors you need to 
> add to request -> "topics" config, like:
> {code:java}
> PUT / 
> http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
> {
>     "connector.class": 
> "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
> "topics": "test-topic"
> }{code}
> So there is a little missmatch of the ways how to validate connectors which I 
> think happened accidentally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6342) Move workaround for JSON parsing of non-escaped strings

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6342:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since it is not ready in time for 2.0.0

> Move workaround for JSON parsing of non-escaped strings
> ---
>
> Key: KAFKA-6342
> URL: https://issues.apache.org/jira/browse/KAFKA-6342
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Umesh Chaudhary
>Priority: Major
> Fix For: 2.1.0
>
>
> KAFKA-6319 added a workaround to parse invalid JSON persisted using older 
> versions of Kafka because special characters were not escaped. The workaround 
> is required in 1.0.1 to enable parsing invalid JSON from ACL configs in 
> ZooKeeper. We can move the workaround out of kafka.utils.Json#parseFull for 
> 1.1.0 so that it is applied only to ACLs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7051) Improve the efficiency of the ReplicaManager when there are many partitions

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7051:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

[~cmccabe] Moving this out to 2.1.0 since it is not ready yet. But feel free to 
include in 2.0.0 if reviewed on time.

> Improve the efficiency of the ReplicaManager when there are many partitions
> ---
>
> Key: KAFKA-7051
> URL: https://issues.apache.org/jira/browse/KAFKA-7051
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.0
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6897) Mirrormaker waits to shut down forever on produce failure with abort.on.send.failure=true

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6897:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since it is not ready for 2.0.0

> Mirrormaker waits to shut down forever on produce failure with 
> abort.on.send.failure=true 
> --
>
> Key: KAFKA-6897
> URL: https://issues.apache.org/jira/browse/KAFKA-6897
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Koelli Mungee
>Assignee: Dhruvil Shah
>Priority: Major
> Fix For: 2.1.0
>
> Attachments: mirror_maker_thread_dump.log
>
>
> Mirrormaker never shuts down after a produce failure when 
> abort.on.send.failure=true
> {code:java}
> [2018-05-07 08:29:32,417] ERROR Error when sending message to topic test with 
> key: 52 bytes, value: 32615 bytes with error: 
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> test-11: 45646 ms has passed since last append
> [2018-05-07 08:29:32,434] INFO Closing producer due to send failure. 
> (kafka.tools.MirrorMaker$)
> [2018-05-07 08:29:32,434] INFO [Producer clientId=producer-1] Closing the 
> Kafka producer with timeoutMillis = 0 ms. 
> (org.apache.kafka.clients.producer.KafkaProducer)
> {code}
> A stack trace of this mirrormaker process 9 hours later shows that the main 
> thread is still active and it is waiting for metadata that it will never get 
> since the producer send thread is no longer running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6648:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since it is not ready for 2.0.0.

> Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
> -
>
> Key: KAFKA-6648
> URL: https://issues.apache.org/jira/browse/KAFKA-6648
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.1.0
>
>
> {code}
> if (!shouldRetry) {
>HashMap> topicsPartitionInfos = new 
> HashMap<>();
>for (String topic : cluster.topics())
>   topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5445) Document exceptions thrown by AdminClient methods

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5445:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since it is not ready for 2.0.0

> Document exceptions thrown by AdminClient methods
> -
>
> Key: KAFKA-5445
> URL: https://issues.apache.org/jira/browse/KAFKA-5445
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients
>Reporter: Ismael Juma
>Assignee: Andrey Dyachkov
>Priority: Major
> Fix For: 1.0.2, 2.1.0
>
>
> AdminClient should document the exceptions that users may have to handle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5098) KafkaProducer.send() blocks and generates TimeoutException if topic name has illegal char

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5098:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since it is not ready for 2.0.0

> KafkaProducer.send() blocks and generates TimeoutException if topic name has 
> illegal char
> -
>
> Key: KAFKA-5098
> URL: https://issues.apache.org/jira/browse/KAFKA-5098
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
> Environment: Java client running against server using 
> wurstmeister/kafka Docker image.
>Reporter: Jeff Larsen
>Assignee: huxihx
>Priority: Major
> Fix For: 2.1.0
>
>
> The server is running with auto create enabled. If we try to publish to a 
> topic with a forward slash in the name, the call blocks and we get a 
> TimeoutException in the Callback. I would expect it to return immediately 
> with an InvalidTopicException.
> There are other blocking issues that have been reported which may be related 
> to some degree, but this particular cause seems unrelated.
> Sample code:
> {code}
> import org.apache.kafka.clients.producer.*;
> import java.util.*;
> public class KafkaProducerUnexpectedBlockingAndTimeoutException {
>   public static void main(String[] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "kafka.example.com:9092");
> props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> props.put("max.block.ms", 1); // 10 seconds should illustrate our 
> point
> String separator = "/";
> //String separator = "_";
> try (Producer producer = new KafkaProducer<>(props)) {
>   System.out.println("Calling KafkaProducer.send() at " + new Date());
>   producer.send(
>   new ProducerRecord("abc" + separator + 
> "someStreamName",
>   "Not expecting a TimeoutException here"),
>   new Callback() {
> @Override
> public void onCompletion(RecordMetadata metadata, Exception e) {
>   if (e != null) {
> System.out.println(e.toString());
>   }
> }
>   });
>   System.out.println("KafkaProducer.send() completed at " + new Date());
> }
>   }
> }
> {code}
> Switching to the underscore separator in the above example works as expected.
> Mea culpa: We neglected to research allowed chars in a topic name, but the 
> TimeoutException we encountered did not help point us in the right direction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6780) log cleaner shouldn't clean messages beyond high watermark

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6780:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since there is no PR yet.

> log cleaner shouldn't clean messages beyond high watermark
> --
>
> Key: KAFKA-6780
> URL: https://issues.apache.org/jira/browse/KAFKA-6780
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Assignee: Anna Povzner
>Priority: Major
> Fix For: 1.1.1, 2.1.0
>
>
> Currently, the firstUncleanableDirtyOffset computed by the log cleaner is 
> bounded by the first offset in the active segment. It's possible for the high 
> watermark to be smaller than that. This may cause a committed record to be 
> removed because of an uncommitted record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4950:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out since it is not ready for 2,0.0

> ConcurrentModificationException when iterating over Kafka Metrics
> -
>
> Key: KAFKA-4950
> URL: https://issues.apache.org/jira/browse/KAFKA-4950
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.1
>Reporter: Dumitru Postoronca
>Assignee: Sébastien Launay
>Priority: Minor
> Fix For: 1.0.2, 2.1.0
>
>
> It looks like the when calling {{PartitionStates.partitionSet()}}, while the 
> resulting Hashmap is being built, the internal state of the allocations can 
> change, which leads to ConcurrentModificationException during the copy 
> operation.
> {code}
> java.util.ConcurrentModificationException
> at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at 
> java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> at java.util.HashSet.(HashSet.java:119)
> at 
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> {code}
> {code}
> // client code:
> import java.util.Collections;
> import java.util.HashMap;
> import java.util.Map;
> import com.codahale.metrics.Gauge;
> import com.codahale.metrics.Metric;
> import com.codahale.metrics.MetricSet;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.MetricName;
> import static com.codahale.metrics.MetricRegistry.name;
> public class KafkaMetricSet implements MetricSet {
> private final KafkaConsumer client;
> public KafkaMetricSet(KafkaConsumer client) {
> this.client = client;
> }
> @Override
> public Map getMetrics() {
> final Map gauges = new HashMap();
> Map m = client.metrics();
> for (Map.Entry e : 
> m.entrySet()) {
> gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), 
> new Gauge() {
> @Override
> public Double getValue() {
> return e.getValue().value(); // exception thrown here 
> }
> });
> }
> return Collections.unmodifiableMap(gauges);
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5479) Docs for authorization omit authorizer.class.name

2018-06-14 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5479:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since it is not ready for 2.0.0

> Docs for authorization omit authorizer.class.name
> -
>
> Key: KAFKA-5479
> URL: https://issues.apache.org/jira/browse/KAFKA-5479
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>  Labels: patch-available
> Fix For: 2.1.0
>
>
> The documentation in §7.4 Authorization and ACLs doesn't mention the 
> {{authorizer.class.name}} setting. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2018-06-14 Thread Don (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512051#comment-16512051
 ] 

Don commented on KAFKA-4113:


Hi Ben, you had a very interesting link in the original comment. Coincidentally 
we were looking into how to bootstrap KTable/GLobalKTable when you posted this. 
It's giving 404 now :/
Was there any technical reason for removing it? 

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)