[jira] [Updated] (KAFKA-9526) Augment topology description with serdes

2020-02-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9526:
---
Labels: needs-kip  (was: )

> Augment topology description with serdes
> 
>
> Key: KAFKA-9526
> URL: https://issues.apache.org/jira/browse/KAFKA-9526
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Today we have multiple ways to infer and inherit serde along the topology, 
> and only fall back to the configured serde when inference does not apply. So 
> it is a bit hard for users to reason which operators inside the topology 
> still lacks serde specification.
> So I'd propose we augment the topology description with serde information on 
> source / sink and state store operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9526) Augment topology description with serdes

2020-02-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9526:
---
Component/s: streams

> Augment topology description with serdes
> 
>
> Key: KAFKA-9526
> URL: https://issues.apache.org/jira/browse/KAFKA-9526
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Today we have multiple ways to infer and inherit serde along the topology, 
> and only fall back to the configured serde when inference does not apply. So 
> it is a bit hard for users to reason which operators inside the topology 
> still lacks serde specification.
> So I'd propose we augment the topology description with serde information on 
> source / sink and state store operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9523) Reduce flakiness of BranchedMultiLevelRepartitionConnectedTopologyTest

2020-02-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9523:
---
Component/s: unit tests
 streams

> Reduce flakiness of BranchedMultiLevelRepartitionConnectedTopologyTest
> --
>
> Key: KAFKA-9523
> URL: https://issues.apache.org/jira/browse/KAFKA-9523
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Boyang Chen
>Priority: Major
>
> KAFKA-9335 introduces an integration test to verify the topology builder 
> itself could survive from building a complex topology. This test gets flaky 
> some time for stream client to broker connection, so we should consider 
> making it less flaky by either converting to a unit test or just focus on 
> making the test logic more robust.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8307) Kafka Streams should provide some mechanism to determine topology equality and compatibility

2020-02-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032806#comment-17032806
 ] 

John Roesler commented on KAFKA-8307:
-

Thanks for the comment, [~guozhang]. It closely matches what has been bouncing 
around my head, in general that a full fix probably requires a number of 
different features, and specifically, that we need some way to map "compatible" 
topologies to each other. This would also let us deal better with topology 
evolution. But the thought is not very well formed at all.

> Kafka Streams should provide some mechanism to determine topology equality 
> and compatibility
> 
>
> Key: KAFKA-8307
> URL: https://issues.apache.org/jira/browse/KAFKA-8307
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: user-experience
>
> Currently, Streams provides no mechanism to compare two topologies. This is a 
> common operation when users want to have tests verifying that they don't 
> accidentally alter their topology. They would save the known-good topology 
> and then add a unit test verifying the current code against that known-good 
> state.
> However, because there's no way to do this comparison properly, everyone is 
> reduced to using the string format of the topology (from 
> `Topology#describe().toString()`). The major drawback is that the string 
> format is meant for human consumption. It is neither machine-parseable nor 
> stable. So, these compatibility tests are doomed to fail when any minor, 
> non-breaking, change is made either to the application, or to the library. 
> This trains everyone to update the test whenever it fails, undermining its 
> utility.
> We should fix this problem, and provide both a mechanism to serialize the 
> topology and to compare two topologies for compatibility. All in all, I think 
> we need:
> # a way to serialize/deserialize topology structure in a machine-parseable 
> format that is future-compatible. Offhand, I'd recommend serializing the 
> topology structure as JSON, and establishing a policy that attributes should 
> only be added to the object graph, never removed. Note, it's out of scope to 
> be able to actually run a deserialized topology; we only want to save and 
> load the structure (not the logic) to facilitate comparisons.
> # a method to verify the *equality* of two topologies... This method tells 
> you that the two topologies are structurally identical. We can't know if the 
> logic of any operator has changed, only if the structure of the graph is 
> changed. We can consider whether other graph properties, like serdes, should 
> be included.
> # a method to verify the *compatibility* of two topologies... This method 
> tells you that moving from topology A to topology B does not require an 
> application reset. Note that this operation is not commutative: 
> `A.compatibleWith(B)` does not imply `B.compatibleWith(A)`. We can discuss 
> whether `A.compatibleWith(B) && B.compatibleWith(A)` implies `A.equals(B)` (I 
> think not necessarily, because we may want "equality" to be stricter than 
> "compatibility").



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9518) NullPointerException on out-of-order topologies

2020-02-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032805#comment-17032805
 ] 

John Roesler commented on KAFKA-9518:
-

Sure thing. I've linked them.

> NullPointerException on out-of-order topologies
> ---
>
> Key: KAFKA-9518
> URL: https://issues.apache.org/jira/browse/KAFKA-9518
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1, 2.4.0, 2.3.1
>Reporter: Murilo Tavares
>Priority: Minor
> Attachments: kafka-streams-testing.zip
>
>
> I have a KafkaStreams that dinamically builds a topology based on a Map of 
> input-to-output topics. Since the map was not sorted, iteration was 
> unpredictable, and different instances could have different orders. When this 
> happen, KafkaStreams throws an exception during REBALANCE.
>  
> I was able to reproduce this using the attached java project. The project is 
> a pretty simple Maven project with one class. It starts 2 instances in 
> parallel, with the same input-to-output topics, but one instance takes the 
> topics in a reversed order.
>  
> The exception is this:
> {noformat}
> Exception in thread 
> "MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: stream-thread 
> [MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1] Failed to 
> rebalance.
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:234)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:176)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
> ... 3 more{noformat}
>  
> The topology for both instances:
> {code:java}
> // instance1
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-00 (topics: [topicA])
>       --> KSTREAM-SINK-01
>     Sink: KSTREAM-SINK-01 (topic: topicA-repartitioned)
>       <-- KSTREAM-SOURCE-00
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-02 (topics: [topicB])
>       --> KSTREAM-SINK-03
>     Sink: KSTREAM-SINK-03 (topic: topicB-repartitioned)
>       <-- KSTREAM-SOURCE-02
> // instance2
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-00 (topics: [topicB])
>       --> KSTREAM-SINK-01
>     Sink: KSTREAM-SINK-01 (topic: topicB-repartitioned)
>       <-- KSTREAM-SOURCE-00
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-02 (topics: [topicA])
>       --> KSTREAM-SINK-03
>     Sink: KSTREAM-SINK-03 (topic: topicA-repartitioned)
>       <-- KSTREAM-SOURCE-02{code}
> In my actual project, I fixed the issue by sorting the topics map 
> accordingly, but it would be nice to have at least a 

[jira] [Commented] (KAFKA-9374) Worker can be disabled by blocked connectors

2020-02-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032791#comment-17032791
 ] 

ASF GitHub Bot commented on KAFKA-9374:
---

C0urante commented on pull request #8069: KAFKA-9374: Make connector 
interactions asynchronous
URL: https://github.com/apache/kafka/pull/8069
 
 
   [Jira ticket](https://issues.apache.org/jira/browse/KAFKA-9374)
   
   These changes allow herders to continue to function even when a connector 
they
   are running hangs in its start, stop, initialize, validate, and/or config
   methods.
   
   The main idea is to make these connector interactions asynchronous and accept
   a callback that can be invoked upon the completion (successful or otherwise) 
of
   these interactions. The distributed herder handles any follow-up logic by 
adding
   a new herder request to its queue in that callback, which helps preserve some
   synchronization and ordering guarantees provided by the current tick model.
   
   There are several items that still need to be addressed:
   
   1)  The standalone herder has not been updated to utilize the new 
asynchronous
   connector wrapper API provided by the Worker and AbstractHerder classes
   
   2)  There is a minor TODO in the DistributedHerderTest class regarding the 
need
   to migrate some testing logic into the AbstractHerderTest class
   
   3)  More significantly, since connector shutdown is now handled 
asynchronously,
   there are two problems with the current changes:
   
   a - It is possible (even likely) that a new instance of a connector will 
be
   created before an older instance has been shut down. This is 
especially
   problematic if a connector claims a shared resource such as a port
   number and could potentially lead to unnecessary connector failure on
   startup.
   
   b - There is no time allocated during shutdown of the herder for its
   connectors to shutdown, which may lead to improper resource cleanup.
   
   Existing unit tests for the distributed herder and worker have been modified 
to reflect these changes, and a new integration test named 
`BlockingConnectorTest` has been added to ensure that they work in practice.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Worker can be disabled by blocked connectors
> 
>
> Key: KAFKA-9374
> URL: https://issues.apache.org/jira/browse/KAFKA-9374
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, 
> 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> If a connector hangs during any of its {{initialize}}, {{start}}, {{stop}}, 
> \{taskConfigs}}, {{taskClass}}, {{version}}, {{config}}, or {{validate}} 
> methods, the worker will be disabled for some types of requests thereafter, 
> including connector creation, connector reconfiguration, and connector 
> deletion.
>  -This only occurs in distributed mode and is due to the threading model used 
> by the 
> [DistributedHerder|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java]
>  class.- This affects both distributed and standalone mode. Distributed 
> herders perform some connector work synchronously in their {{tick}} thread, 
> which also handles group membership and some REST requests. The majority of 
> the herder methods for the standalone herder are {{synchronized}}, including 
> those for creating, updating, and deleting connectors; as long as one of 
> those methods blocks, all subsequent calls to any of these methods will also 
> be blocked.
>  
> One potential solution could be to treat connectors that fail to start, stop, 
> etc. in time similarly to tasks that fail to stop within the [task graceful 
> shutdown timeout 
> period|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L121-L126]
>  by handling all connector interactions on a separate thread, waiting for 
> them to complete within a timeout, and abandoning the thread (and 
> transitioning the connector to the {{FAILED}} state, if it has been created 
> at all) if that timeout expires.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9526) Augment topology description with serdes

2020-02-07 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-9526:


 Summary: Augment topology description with serdes
 Key: KAFKA-9526
 URL: https://issues.apache.org/jira/browse/KAFKA-9526
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang
Assignee: Guozhang Wang


Today we have multiple ways to infer and inherit serde along the topology, and 
only fall back to the configured serde when inference does not apply. So it is 
a bit hard for users to reason which operators inside the topology still lacks 
serde specification.

So I'd propose we augment the topology description with serde information on 
source / sink and state store operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9525) Allow explicit rebalance triggering on the Consumer

2020-02-07 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-9525:
--

Assignee: Sophie Blee-Goldman

> Allow explicit rebalance triggering on the Consumer
> ---
>
> Key: KAFKA-9525
> URL: https://issues.apache.org/jira/browse/KAFKA-9525
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> Currently the only way to explicitly trigger a rebalance is by unsubscribing 
> the consumer. This has two drawbacks: it does not work with static 
> membership, and it causes the consumer to revoke all its currently owned 
> partitions. Streams relies on being able to enforce a rebalance for its 
> version probing upgrade protocol and the upcoming KIP-441, both of which 
> should be able to work with static membership and be able to leverage the 
> improvements of KIP-429 to no longer revoke all owned partitions.
> We should add an API that will allow users to explicitly trigger a rebalance 
> without going through #unsubscribe



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9525) Allow explicit rebalance triggering on the Consumer

2020-02-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032788#comment-17032788
 ] 

John Roesler commented on KAFKA-9525:
-

Agreed. If "trigger a rebalance" is the desired operation, then the best thing 
is offer a way to do exactly that. 

> Allow explicit rebalance triggering on the Consumer
> ---
>
> Key: KAFKA-9525
> URL: https://issues.apache.org/jira/browse/KAFKA-9525
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> Currently the only way to explicitly trigger a rebalance is by unsubscribing 
> the consumer. This has two drawbacks: it does not work with static 
> membership, and it causes the consumer to revoke all its currently owned 
> partitions. Streams relies on being able to enforce a rebalance for its 
> version probing upgrade protocol and the upcoming KIP-441, both of which 
> should be able to work with static membership and be able to leverage the 
> improvements of KIP-429 to no longer revoke all owned partitions.
> We should add an API that will allow users to explicitly trigger a rebalance 
> without going through #unsubscribe



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9206) Consumer should handle `CORRUPT_MESSAGE` error code in fetch response

2020-02-07 Thread Agam Brahma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Agam Brahma reassigned KAFKA-9206:
--

Assignee: Agam Brahma

> Consumer should handle `CORRUPT_MESSAGE` error code in fetch response
> -
>
> Key: KAFKA-9206
> URL: https://issues.apache.org/jira/browse/KAFKA-9206
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Agam Brahma
>Priority: Major
>
> This error code is possible, for example, when the broker scans the log to 
> find the fetch offset after the index lookup. Currently this results in a 
> slightly obscure message such as the following:
> {code:java}
> java.lang.IllegalStateException: Unexpected error code 2 while fetching 
> data{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9507) AdminClient should check for missing committed offsets

2020-02-07 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9507.

Fix Version/s: 2.4.1
   2.3.2
   2.5.0
   Resolution: Fixed

> AdminClient should check for missing committed offsets
> --
>
> Key: KAFKA-9507
> URL: https://issues.apache.org/jira/browse/KAFKA-9507
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: David Mao
>Priority: Major
>  Labels: newbie
> Fix For: 2.5.0, 2.3.2, 2.4.1
>
>
> I noticed this exception getting raised:
> {code}
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>   at 
> org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:50)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$24$1.handleResponse(KafkaAdminClient.java:2832)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1032)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1160)
> {code}
> The AdminClient should check for negative offsets in OffsetFetch responses in 
> the api `listConsumerGroupOffsets`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9507) AdminClient should check for missing committed offsets

2020-02-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032770#comment-17032770
 ] 

ASF GitHub Bot commented on KAFKA-9507:
---

hachikuji commented on pull request #8057: KAFKA-9507 AdminClient should check 
for missing committed offsets
URL: https://github.com/apache/kafka/pull/8057
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AdminClient should check for missing committed offsets
> --
>
> Key: KAFKA-9507
> URL: https://issues.apache.org/jira/browse/KAFKA-9507
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: David Mao
>Priority: Major
>  Labels: newbie
>
> I noticed this exception getting raised:
> {code}
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>   at 
> org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:50)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$24$1.handleResponse(KafkaAdminClient.java:2832)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1032)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1160)
> {code}
> The AdminClient should check for negative offsets in OffsetFetch responses in 
> the api `listConsumerGroupOffsets`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2020-02-07 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032769#comment-17032769
 ] 

Jason Gustafson edited comment on KAFKA-8940 at 2/8/20 12:41 AM:
-

Found a new instance of this: 
https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4554/testReport/junit/org.apache.kafka.streams.integration/SmokeTestDriverIntegrationTest/shouldWorkWithRebalance/
{code}
java.lang.AssertionError: verifying tagg
fail: key=470 tagg=[ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 
0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized 
value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key 
= 470, value = 1)] expected=0
 taggEvents: [ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 
0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized 
value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key 
= 470, value = 1)]
verifying suppressed min-suppressed
verifying min-suppressed with 20 keys
fail: resultCount=20 expectedCount=10
result=[[4-1003@158112000/158120640], 
[7-1006@158103360/158112000], [1-1000@158103360/158112000], 
[9-1008@158112000/158120640], [0-999@158112000/158120640], 
[9-1008@158103360/158112000], [3-1002@158103360/158112000], 
[3-1002@158112000/158120640], [8-1007@158112000/158120640], 
[8-1007@158103360/158112000], [2-1001@158112000/158120640], 
[2-1001@158103360/158112000], [7-1006@158112000/158120640], 
[1-1000@158112000/158120640], [0-999@158103360/158112000], 
[4-1003@158103360/158112000], [6-1005@158112000/158120640], 
[6-1005@158103360/158112000], [5-1004@158103360/158112000], 
[5-1004@158112000/158120640]]
expected=[7-1006, 3-1002, 0-999, 1-1000, 4-1003, 2-1001, 5-1004, 
8-1007, 6-1005, 9-1008]
verifying suppressed sws-suppressed
verifying min with 10 keys
min fail: key=7-1006 actual=535 expected=7
{code}


was (Author: hachikuji):
Found a new instance of this: 
https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4554/testReport/junit/org.apache.kafka.streams.integration/SmokeTestDriverIntegrationTest/shouldWorkWithRebalance/
```
java.lang.AssertionError: verifying tagg
fail: key=470 tagg=[ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 
0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized 
value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key 
= 470, value = 1)] expected=0
 taggEvents: [ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 
0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized 
value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key 
= 470, value = 1)]
verifying suppressed min-suppressed
verifying min-suppressed with 20 keys
fail: resultCount=20 expectedCount=10
result=[[4-1003@158112000/158120640], 
[7-1006@158103360/158112000], [1-1000@158103360/158112000], 
[9-1008@158112000/158120640], [0-999@158112000/158120640], 
[9-1008@158103360/158112000], [3-1002@158103360/158112000], 
[3-1002@158112000/158120640], [8-1007@158112000/158120640], 
[8-1007@158103360/158112000], [2-1001@158112000/158120640], 
[2-1001@158103360/158112000], [7-1006@158112000/158120640], 
[1-1000@158112000/158120640], [0-999@158103360/158112000], 
[4-1003@158103360/158112000], [6-1005@158112000/158120640], 
[6-1005@158103360/158112000], [5-1004@158103360/158112000], 
[5-1004@158112000/158120640]]
expected=[7-1006, 3-1002, 0-999, 1-1000, 4-1003, 2-1001, 5-1004, 
8-1007, 6-1005, 9-1008]
verifying suppressed sws-suppressed
verifying min with 10 keys
min fail: key=7-1006 actual=535 expected=7
```

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-8940
> URL: https://issues.apache.org/jira/browse/KAFKA-8940
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.5.0
>
>
> I lost the screen shot unfortunately... it reports the set of expected 
> records does not match the received records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2020-02-07 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032769#comment-17032769
 ] 

Jason Gustafson commented on KAFKA-8940:


Found a new instance of this: 
https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4554/testReport/junit/org.apache.kafka.streams.integration/SmokeTestDriverIntegrationTest/shouldWorkWithRebalance/
```
java.lang.AssertionError: verifying tagg
fail: key=470 tagg=[ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 
0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized 
value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key 
= 470, value = 1)] expected=0
 taggEvents: [ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 
0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized 
value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key 
= 470, value = 1)]
verifying suppressed min-suppressed
verifying min-suppressed with 20 keys
fail: resultCount=20 expectedCount=10
result=[[4-1003@158112000/158120640], 
[7-1006@158103360/158112000], [1-1000@158103360/158112000], 
[9-1008@158112000/158120640], [0-999@158112000/158120640], 
[9-1008@158103360/158112000], [3-1002@158103360/158112000], 
[3-1002@158112000/158120640], [8-1007@158112000/158120640], 
[8-1007@158103360/158112000], [2-1001@158112000/158120640], 
[2-1001@158103360/158112000], [7-1006@158112000/158120640], 
[1-1000@158112000/158120640], [0-999@158103360/158112000], 
[4-1003@158103360/158112000], [6-1005@158112000/158120640], 
[6-1005@158103360/158112000], [5-1004@158103360/158112000], 
[5-1004@158112000/158120640]]
expected=[7-1006, 3-1002, 0-999, 1-1000, 4-1003, 2-1001, 5-1004, 
8-1007, 6-1005, 9-1008]
verifying suppressed sws-suppressed
verifying min with 10 keys
min fail: key=7-1006 actual=535 expected=7
```

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-8940
> URL: https://issues.apache.org/jira/browse/KAFKA-8940
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test
> Fix For: 2.5.0
>
>
> I lost the screen shot unfortunately... it reports the set of expected 
> records does not match the received records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9254) Updating Kafka Broker configuration dynamically twice reverts log configuration to default

2020-02-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032766#comment-17032766
 ] 

ASF GitHub Bot commented on KAFKA-9254:
---

soondenana commented on pull request #8067: KAFKA-9254; Overridden topic 
configs are reset after dynamic default change (#7870)
URL: https://github.com/apache/kafka/pull/8067
 
 
   Currently, when a dynamic change is made to the broker-level default log 
configuration, existing log configs will be recreated with an empty overridden 
configs. In such case, when updating dynamic broker configs a second round, the 
topic-level configs are lost. This can cause unexpected data loss, for example, 
if the cleanup policy changes from "compact" to "delete."
   
   Reviewers: Rajini Sivaram , Jason Gustafson 

   (cherry picked from commit 0e7f867041959c5d77727c7f5ce32d363fa09fc2)
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Updating Kafka Broker configuration dynamically twice reverts log 
> configuration to default
> --
>
> Key: KAFKA-9254
> URL: https://issues.apache.org/jira/browse/KAFKA-9254
> Project: Kafka
>  Issue Type: Bug
>  Components: config, log, replication
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.4.0, 2.3.1
>Reporter: fenghong
>Assignee: huxihx
>Priority: Critical
> Fix For: 2.0.2, 2.1.2, 2.2.3, 2.3.2, 2.4.1
>
>
> We are engineers at Huobi and now encounter Kafka BUG 
> Modifying DynamicBrokerConfig more than 2 times will invalidate the topic 
> level unrelated configuration
> The bug reproduction method as follows:
>  # Set Kafka Broker config  server.properties min.insync.replicas=3
>  # Create topic test-1 and set topic‘s level config min.insync.replicas=2
>  # Dynamically modify the configuration twice as shown below
> {code:java}
> bin/kafka-configs.sh --bootstrap-server xxx:9092 --entity-type brokers 
> --entity-default --alter --add-config log.message.timestamp.type=LogAppendTime
> bin/kafka-configs.sh --bootstrap-server xxx:9092 --entity-type brokers 
> --entity-default --alter --add-config log.retention.ms=60480
> {code}
>  # stop a Kafka Server and found the Exception as shown below
>  org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync 
> replicas for partition test-1-0 is [2], below required minimum [3]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9525) Allow explicit rebalance triggering on the Consumer

2020-02-07 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032763#comment-17032763
 ] 

Sophie Blee-Goldman commented on KAFKA-9525:


Well, the point is we want to be able to trigger a rebalance without removing a 
member. That would definitely solve the static membership problem, but we still 
want the option to trigger a rebalance as lightweight as possible for KIP-441, 
ie without having to revoke all partitions or actually leave the group. 

> Allow explicit rebalance triggering on the Consumer
> ---
>
> Key: KAFKA-9525
> URL: https://issues.apache.org/jira/browse/KAFKA-9525
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> Currently the only way to explicitly trigger a rebalance is by unsubscribing 
> the consumer. This has two drawbacks: it does not work with static 
> membership, and it causes the consumer to revoke all its currently owned 
> partitions. Streams relies on being able to enforce a rebalance for its 
> version probing upgrade protocol and the upcoming KIP-441, both of which 
> should be able to work with static membership and be able to leverage the 
> improvements of KIP-429 to no longer revoke all owned partitions.
> We should add an API that will allow users to explicitly trigger a rebalance 
> without going through #unsubscribe



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9525) Allow explicit rebalance triggering on the Consumer

2020-02-07 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032762#comment-17032762
 ] 

Boyang Chen commented on KAFKA-9525:


If you are talking about application level rebalance triggering, after 
https://issues.apache.org/jira/browse/KAFKA-9146 we could potentially have 
admin client remove one single member to trigger rebalance, which is pretty 
convenient.

> Allow explicit rebalance triggering on the Consumer
> ---
>
> Key: KAFKA-9525
> URL: https://issues.apache.org/jira/browse/KAFKA-9525
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> Currently the only way to explicitly trigger a rebalance is by unsubscribing 
> the consumer. This has two drawbacks: it does not work with static 
> membership, and it causes the consumer to revoke all its currently owned 
> partitions. Streams relies on being able to enforce a rebalance for its 
> version probing upgrade protocol and the upcoming KIP-441, both of which 
> should be able to work with static membership and be able to leverage the 
> improvements of KIP-429 to no longer revoke all owned partitions.
> We should add an API that will allow users to explicitly trigger a rebalance 
> without going through #unsubscribe



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9519) Deprecate ZooKeeper access for kafka-configs.sh

2020-02-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032760#comment-17032760
 ] 

ASF GitHub Bot commented on KAFKA-9519:
---

cmccabe commented on pull request #8056: KAFKA-9519: Deprecating ZK for 
ConfigCommand
URL: https://github.com/apache/kafka/pull/8056
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deprecate ZooKeeper access for kafka-configs.sh   
> 
>
> Key: KAFKA-9519
> URL: https://issues.apache.org/jira/browse/KAFKA-9519
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 2.5.0
>Reporter: Sanjana Kaundinya
>Assignee: Sanjana Kaundinya
>Priority: Major
> Fix For: 2.5.0
>
>
> As part of KIP-555 access for zookeeper must be deprecated  for 
> kafka-configs.sh.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4090) JVM runs into OOM if (Java) client uses a SSL port without setting the security protocol

2020-02-07 Thread David Mollitor (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032753#comment-17032753
 ] 

David Mollitor commented on KAFKA-4090:
---

OK.  I took a stab at this and I added a unit test to simulate the SSL response 
laid out in KIP-498.

I don't think the answer is to use some sort of max buffer size.  That feature 
may be nice as protection from a misconfigured server, but not for this use 
case.

As pointed out int KIP-498, the server identifies that this is a plain-text 
connection, but the client is unable to identify the connection as being 
SSL-enabled.
How is that the case?  Well, the server buffers data as it is being read in 
chunks (8K perhaps).  Once it reads enough data into the buffer, it runs a 
quick scan on the bytes to try to detect SSL v.s. plaintext.

I tried putting the logic into the client, but it was very difficult.  The 
client does not buffer data in chunks in the same way.  It reads 4 bytes from 
the stream, and when those four bytes are read, it parses the size and creates 
the necessary payload buffer.  It's very simple.

There's just not a lot of flexibility in the existing setup.  Once the data is 
consumed from the stream, you can't put it back.  That is, there is no way to 
read 16 bytes, check for SSL, and if it's not SSL, put the bytes back into the 
front of the stream (or just inspect without consuming) for normal processing.  
Once those 16 bytes are consumed, they are consumed... even if the stream has 
two 8 byte packets one-after-the-other.  In this scenario, reading 16 bytes to 
parse one packet just ate a packet.

To detect SSL, Netty (Apache 2.0) requires 5 bytes.  So, I am doing the same.  
I am consuming 4 bytes and not loading a buffer at that time.  If the stream 
produces just 1 more byte, then I check if SSL is present, and if not, I create 
the required buffer.  It is much like a lazy-load implementation.  The payload 
buffer doesn't get created unless it absolutely has to.

Unfortunately, the unit tests bit me hard, because they all assume there will 
be two reads: one 4-byte read to get the payload size (and create the payload 
buffer), then one read to get the payload itself.  So, the mocking all mimics 
two reads.  My code does 3 reads: 1 for the size, 1 to get a more byte to test 
for SSL, then 1 for the payload. For anything that is a "live" test, with a 
test socket actually receiving data, this is brittle but generally works 
because it is very unlikely that the socket won't at least have 4 bytes for the 
size in the first read.  However, I think some of these live test would fail if 
there was only 1 or 2 bytes came from the stream at a time.  There is one unit 
test that is counting the number of reads that take place.  I had to disable it 
because it assumes the payload buffer is created after only 1 read (flaky test 
if the data comes in slowly),... it blocks (for testing purposes) any 
subsequent reads, so my implementation never creates the lazy payload buffer 
because a second read is never triggered).






> JVM runs into OOM if (Java) client uses a SSL port without setting the 
> security protocol
> 
>
> Key: KAFKA-4090
> URL: https://issues.apache.org/jira/browse/KAFKA-4090
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1, 0.10.0.1, 2.1.0
>Reporter: Jaikiran Pai
>Assignee: Alexandre Dupriez
>Priority: Major
>
> Quoting from the mail thread that was sent to Kafka mailing list:
> {quote}
> We have been using Kafka 0.9.0.1 (server and Java client libraries). So far 
> we had been using it with plaintext transport but recently have been 
> considering upgrading to using SSL. It mostly works except that a 
> mis-configured producer (and even consumer) causes a hard to relate 
> OutOfMemory exception and thus causing the JVM in which the client is 
> running, to go into a bad state. We can consistently reproduce that OOM very 
> easily. We decided to check if this is something that is fixed in 0.10.0.1 so 
> upgraded one of our test systems to that version (both server and client 
> libraries) but still see the same issue. Here's how it can be easily 
> reproduced
> 1. Enable SSL listener on the broker via server.properties, as per the Kafka 
> documentation
> {code}
> listeners=PLAINTEXT://:9092,SSL://:9093
> ssl.keystore.location=
> ssl.keystore.password=pass
> ssl.key.password=pass
> ssl.truststore.location=
> ssl.truststore.password=pass
> {code}
> 2. Start zookeeper and kafka server
> 3. Create a "oom-test" topic (which will be used for these tests):
> {code}
> kafka-topics.sh --zookeeper localhost:2181 --create --topic oom-test  
> --partitions 1 --replication-factor 1
> {code}
> 4. Create a simple producer which 

[jira] [Commented] (KAFKA-4090) JVM runs into OOM if (Java) client uses a SSL port without setting the security protocol

2020-02-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032746#comment-17032746
 ] 

ASF GitHub Bot commented on KAFKA-4090:
---

belugabehr commented on pull request #8066: KAFKA-4090: Validate SSL connection 
in client
URL: https://github.com/apache/kafka/pull/8066
 
 
   Issue has been around for a while.  Will post update on JIRA.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JVM runs into OOM if (Java) client uses a SSL port without setting the 
> security protocol
> 
>
> Key: KAFKA-4090
> URL: https://issues.apache.org/jira/browse/KAFKA-4090
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.9.0.1, 0.10.0.1, 2.1.0
>Reporter: Jaikiran Pai
>Assignee: Alexandre Dupriez
>Priority: Major
>
> Quoting from the mail thread that was sent to Kafka mailing list:
> {quote}
> We have been using Kafka 0.9.0.1 (server and Java client libraries). So far 
> we had been using it with plaintext transport but recently have been 
> considering upgrading to using SSL. It mostly works except that a 
> mis-configured producer (and even consumer) causes a hard to relate 
> OutOfMemory exception and thus causing the JVM in which the client is 
> running, to go into a bad state. We can consistently reproduce that OOM very 
> easily. We decided to check if this is something that is fixed in 0.10.0.1 so 
> upgraded one of our test systems to that version (both server and client 
> libraries) but still see the same issue. Here's how it can be easily 
> reproduced
> 1. Enable SSL listener on the broker via server.properties, as per the Kafka 
> documentation
> {code}
> listeners=PLAINTEXT://:9092,SSL://:9093
> ssl.keystore.location=
> ssl.keystore.password=pass
> ssl.key.password=pass
> ssl.truststore.location=
> ssl.truststore.password=pass
> {code}
> 2. Start zookeeper and kafka server
> 3. Create a "oom-test" topic (which will be used for these tests):
> {code}
> kafka-topics.sh --zookeeper localhost:2181 --create --topic oom-test  
> --partitions 1 --replication-factor 1
> {code}
> 4. Create a simple producer which sends a single message to the topic via 
> Java (new producer) APIs:
> {code}
> public class OOMTest {
> public static void main(final String[] args) throws Exception {
> final Properties kafkaProducerConfigs = new Properties();
> // NOTE: Intentionally use a SSL port without specifying 
> security.protocol as SSL
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9093");
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> 
> kafkaProducerConfigs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>  StringSerializer.class.getName());
> try (KafkaProducer producer = new 
> KafkaProducer<>(kafkaProducerConfigs)) {
> System.out.println("Created Kafka producer");
> final String topicName = "oom-test";
> final String message = "Hello OOM!";
> // send a message to the topic
> final Future recordMetadataFuture = 
> producer.send(new ProducerRecord<>(topicName, message));
> final RecordMetadata sentRecordMetadata = 
> recordMetadataFuture.get();
> System.out.println("Sent message '" + message + "' to topic '" + 
> topicName + "'");
> }
> System.out.println("Tests complete");
> }
> }
> {code}
> Notice that the server URL is using a SSL endpoint localhost:9093 but isn't 
> specifying any of the other necessary SSL configs like security.protocol.
> 5. For the sake of easily reproducing this issue run this class with a max 
> heap size of 256MB (-Xmx256M). Running this code throws up the following 
> OutOfMemoryError in one of the Sender threads:
> {code}
> 18:33:25,770 ERROR [KafkaThread] - Uncaught exception in 
> kafka-producer-network-thread | producer-1:
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
> at 
> 

[jira] [Commented] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean

2020-02-07 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032739#comment-17032739
 ] 

Ted Yu commented on KAFKA-9504:
---

It seems the closing of metrics is not enough in terms of preventing memory 
leak:
{code}
Utils.closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", 
firstException);
Utils.closeQuietly(metrics, "consumer metrics", firstException);
{code}

> Memory leak in KafkaMetrics registered to MBean
> ---
>
> Key: KAFKA-9504
> URL: https://issues.apache.org/jira/browse/KAFKA-9504
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: Andreas Holmén
>Priority: Major
>
> After close() called on a KafkaConsumer some registered MBeans are not 
> unregistered causing leak.
>  
>  
> {code:java}
> import static 
> org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
> import java.lang.management.ManagementFactory;
> import java.util.HashMap;
> import java.util.Map;
> import javax.management.MBeanServer;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> public class Leaker {
>  private static String bootstrapServers = "hostname:9092";
>  
>  public static void main(String[] args) throws InterruptedException {
>   MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
>   Map props = new HashMap<>();
>   props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
>  
>   int beans = mBeanServer.getMBeanCount();
>   for (int i = 0; i < 100; i++) {
>KafkaConsumer consumer = new KafkaConsumer<>(props, new 
> ByteArrayDeserializer(), new ByteArrayDeserializer());
>consumer.close();
>   }
>   int newBeans = mBeanServer.getMBeanCount();
>   System.out.println("\nbeans delta: " + (newBeans - beans));
>  }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9525) Allow explicit rebalance triggering on the Consumer

2020-02-07 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-9525:
--

 Summary: Allow explicit rebalance triggering on the Consumer
 Key: KAFKA-9525
 URL: https://issues.apache.org/jira/browse/KAFKA-9525
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Sophie Blee-Goldman


Currently the only way to explicitly trigger a rebalance is by unsubscribing 
the consumer. This has two drawbacks: it does not work with static membership, 
and it causes the consumer to revoke all its currently owned partitions. 
Streams relies on being able to enforce a rebalance for its version probing 
upgrade protocol and the upcoming KIP-441, both of which should be able to work 
with static membership and be able to leverage the improvements of KIP-429 to 
no longer revoke all owned partitions.

We should add an API that will allow users to explicitly trigger a rebalance 
without going through #unsubscribe



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9524) Default window retention does not consider grace period

2020-02-07 Thread Michael Bingham (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032715#comment-17032715
 ] 

Michael Bingham commented on KAFKA-9524:


Good point about the workaround. Thanks [~vvcephei]!

> Default window retention does not consider grace period
> ---
>
> Key: KAFKA-9524
> URL: https://issues.apache.org/jira/browse/KAFKA-9524
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Bingham
>Priority: Minor
>
> In a windowed aggregation, if you specify a window size larger than the 
> default window retention (1 day), Streams will implicitly set retention 
> accordingly to accommodate windows of that size. For example, 
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20))) 
> {code}
> In this case, Streams will implicitly set window retention to 20 days, and no 
> exceptions will occur.
> However, if you also include a non-zero grace period on the window, such as:
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
> {code}
> In this case, Streams will still implicitly set the window retention 20 days 
> (not 20 days + 5 minutes grace), and an exception will be thrown:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: The retention 
> period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
> than its window size plus the grace period. Got size=[172800], 
> grace=[30], retention=[172800]{code}
> Ideally, Streams should include grace period when implicitly setting window 
> retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9524) Default window retention does not consider grace period

2020-02-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032714#comment-17032714
 ] 

John Roesler commented on KAFKA-9524:
-

Thanks for the report, [~mikebin]!

FWIW, the workaround is to also set the retention time on the store (using 
Materialized) to be at least windowSize + grace. I just mention that for anyone 
who happens to find this ticket after searching the error message.

I agree that we could and should configure the store's retention properly. If I 
recall correctly, we need to remove the deprecated method:
> org.apache.kafka.streams.kstream.TimeWindows#maintainMs
and the other deprecated members of Windows and its children.

They've been deprecated since 2.1, so it's probably been long enough by now for 
us to go ahead and remove them and make Streams behave more nicely in this case.

> Default window retention does not consider grace period
> ---
>
> Key: KAFKA-9524
> URL: https://issues.apache.org/jira/browse/KAFKA-9524
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Bingham
>Priority: Minor
>
> In a windowed aggregation, if you specify a window size larger than the 
> default window retention (1 day), Streams will implicitly set retention 
> accordingly to accommodate windows of that size. For example, 
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20))) 
> {code}
> In this case, Streams will implicitly set window retention to 20 days, and no 
> exceptions will occur.
> However, if you also include a non-zero grace period on the window, such as:
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
> {code}
> In this case, Streams will still implicitly set the window retention 20 days 
> (not 20 days + 5 minutes grace), and an exception will be thrown:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: The retention 
> period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
> than its window size plus the grace period. Got size=[172800], 
> grace=[30], retention=[172800]{code}
> Ideally, Streams should include grace period when implicitly setting window 
> retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9524) Default window retention does not consider grace period

2020-02-07 Thread Michael Bingham (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Bingham updated KAFKA-9524:
---
Description: 
In a windowed aggregation, if you specify a window size larger than the default 
window retention (1 day), Streams will implicitly set retention accordingly to 
accommodate windows of that size. For example, 
{code:java}
.windowedBy(TimeWindows.of(Duration.ofDays(20))) 
{code}
In this case, Streams will implicitly set window retention to 20 days, and no 
exceptions will occur.

However, if you also include a non-zero grace period on the window, such as:
{code:java}
.windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
{code}
In this case, Streams will still implicitly set the window retention 20 days 
(not 20 days + 5 minutes grace), and an exception will be thrown:
{code:java}
Exception in thread "main" java.lang.IllegalArgumentException: The retention 
period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
than its window size plus the grace period. Got size=[172800], 
grace=[30], retention=[172800]{code}
Ideally, Streams should include grace period when implicitly setting window 
retention.

  was:
In a windowed aggregation, if you specify a window size larger than the default 
window retention (1 day), Streams will implicitly set retention accordingly to 
accommodate windows of that size. For example, 
{code:java}
.windowedBy(TimeWindows.of(Duration.ofDays(20))) 
{code}
In this case, Streams will implicitly set window retention to 20 days, and no 
exceptions will occur.

However, if you also include a non-zero grace period on the window, such as:
{code:java}
.windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
{code}
In this case, Streams will still implicitly set the window retention 20 days 
(not 20 days + 5 minutes grace), and an exception will be thrown:
 Exception in thread "main" java.lang.IllegalArgumentException: The retention 
period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
than its window size plus the grace period. Got size=[172800], 
grace=[30], retention=[172800]

Ideally, Streams should include grace period when implicitly setting window 
retention.


> Default window retention does not consider grace period
> ---
>
> Key: KAFKA-9524
> URL: https://issues.apache.org/jira/browse/KAFKA-9524
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Bingham
>Priority: Minor
>
> In a windowed aggregation, if you specify a window size larger than the 
> default window retention (1 day), Streams will implicitly set retention 
> accordingly to accommodate windows of that size. For example, 
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20))) 
> {code}
> In this case, Streams will implicitly set window retention to 20 days, and no 
> exceptions will occur.
> However, if you also include a non-zero grace period on the window, such as:
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
> {code}
> In this case, Streams will still implicitly set the window retention 20 days 
> (not 20 days + 5 minutes grace), and an exception will be thrown:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: The retention 
> period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
> than its window size plus the grace period. Got size=[172800], 
> grace=[30], retention=[172800]{code}
> Ideally, Streams should include grace period when implicitly setting window 
> retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9524) Default window retention does not consider grace period

2020-02-07 Thread Michael Bingham (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Bingham updated KAFKA-9524:
---
Description: 
In a windowed aggregation, if you specify a window size larger than the default 
window retention (1 day), Streams will implicitly set retention accordingly to 
accommodate windows of that size. For example, 
{code:java}
.windowedBy(TimeWindows.of(Duration.ofDays(20))) 
{code}
In this case, Streams will implicitly set window retention to 20 days, and no 
exceptions will occur.

However, if you also include a non-zero grace period on the window, such as:
{code:java}
.windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
{code}
In this case, Streams will still implicitly set the window retention 20 days 
(not 20 days + 5 minutes grace), and an exception will be thrown:
 Exception in thread "main" java.lang.IllegalArgumentException: The retention 
period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
than its window size plus the grace period. Got size=[172800], 
grace=[30], retention=[172800]

Ideally, Streams should include grace period when implicitly setting window 
retention.

  was:
In a windowed aggregation, if you specify a window size larger than the default 
window retention (1 day), Streams will implicitly set retention accordingly to 
accommodate windows of that size. For example,

 
{code:java}
.windowedBy(TimeWindows.of(Duration.ofDays(20))) 
{code}
In this case, Streams will implicitly set window retention to 20 days, and no 
exceptions will occur.

However, if you also include a non-zero grace period on the window, such as:

 
{code:java}
.windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
{code}
In this case, Streams will still implicitly set the window retention 20 days 
(not 20 days + 5 minutes grace), and an exception will be thrown:
Exception in thread "main" java.lang.IllegalArgumentException: The retention 
period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
than its window size plus the grace period. Got size=[172800], 
grace=[30], retention=[172800]
Ideally, Streams should include grace period when implicitly setting window 
retention.


> Default window retention does not consider grace period
> ---
>
> Key: KAFKA-9524
> URL: https://issues.apache.org/jira/browse/KAFKA-9524
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Bingham
>Priority: Minor
>
> In a windowed aggregation, if you specify a window size larger than the 
> default window retention (1 day), Streams will implicitly set retention 
> accordingly to accommodate windows of that size. For example, 
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20))) 
> {code}
> In this case, Streams will implicitly set window retention to 20 days, and no 
> exceptions will occur.
> However, if you also include a non-zero grace period on the window, such as:
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
> {code}
> In this case, Streams will still implicitly set the window retention 20 days 
> (not 20 days + 5 minutes grace), and an exception will be thrown:
>  Exception in thread "main" java.lang.IllegalArgumentException: The retention 
> period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
> than its window size plus the grace period. Got size=[172800], 
> grace=[30], retention=[172800]
> Ideally, Streams should include grace period when implicitly setting window 
> retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9524) Default window retention does not consider grace period

2020-02-07 Thread Michael Bingham (Jira)
Michael Bingham created KAFKA-9524:
--

 Summary: Default window retention does not consider grace period
 Key: KAFKA-9524
 URL: https://issues.apache.org/jira/browse/KAFKA-9524
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.4.0
Reporter: Michael Bingham


In a windowed aggregation, if you specify a window size larger than the default 
window retention (1 day), Streams will implicitly set retention accordingly to 
accommodate windows of that size. For example,

 
{code:java}
.windowedBy(TimeWindows.of(Duration.ofDays(20))) 
{code}
In this case, Streams will implicitly set window retention to 20 days, and no 
exceptions will occur.

However, if you also include a non-zero grace period on the window, such as:

 
{code:java}
.windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
{code}
In this case, Streams will still implicitly set the window retention 20 days 
(not 20 days + 5 minutes grace), and an exception will be thrown:
Exception in thread "main" java.lang.IllegalArgumentException: The retention 
period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
than its window size plus the grace period. Got size=[172800], 
grace=[30], retention=[172800]
Ideally, Streams should include grace period when implicitly setting window 
retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9523) Reduce flakiness of BranchedMultiLevelRepartitionConnectedTopologyTest

2020-02-07 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9523:
--

 Summary: Reduce flakiness of 
BranchedMultiLevelRepartitionConnectedTopologyTest
 Key: KAFKA-9523
 URL: https://issues.apache.org/jira/browse/KAFKA-9523
 Project: Kafka
  Issue Type: Test
Reporter: Boyang Chen


KAFKA-9335 introduces an integration test to verify the topology builder itself 
could survive from building a complex topology. This test gets flaky some time 
for stream client to broker connection, so we should consider making it less 
flaky by either converting to a unit test or just focus on making the test 
logic more robust.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication

2020-02-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032689#comment-17032689
 ] 

ASF GitHub Bot commented on KAFKA-9509:
---

hachikuji commented on pull request #8048: KAFKA-9509: Fixing flakiness of 
MirrorConnectorsIntegrationTest.testReplication
URL: https://github.com/apache/kafka/pull/8048
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix flaky test MirrorConnectorsIntegrationTest.testReplication
> --
>
> Key: KAFKA-9509
> URL: https://issues.apache.org/jira/browse/KAFKA-9509
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.4.1, 2.5.0
>Reporter: Sanjana Kaundinya
>Assignee: Sanjana Kaundinya
>Priority: Major
> Fix For: 2.5.0
>
>
> The test 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication
>  is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of 
> when the connectors and tasks are started up. The fix for this would make it 
> such that when the connectors are started up, to wait until the REST endpoint 
> returns a positive number of tasks to be confident that we can start testing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9248) Foreign key join does not pickup default serdes and dies with NPE

2020-02-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9248.

Resolution: Duplicate

Agreed. Closing this ticket as duplicate.

> Foreign key join does not pickup default serdes and dies with NPE
> -
>
> Key: KAFKA-9248
> URL: https://issues.apache.org/jira/browse/KAFKA-9248
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> The foreign key join operator only works if `Serdes` are passed in by the 
> user via corresponding API methods.
> If one tries to fall back to default Serdes from `StreamsConfig` the operator 
> does not pick up those Serdes but dies with a NPE.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9518) NullPointerException on out-of-order topologies

2020-02-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032663#comment-17032663
 ] 

Matthias J. Sax commented on KAFKA-9518:


Thanks for digging out the other Jira [~vvcephei] – can we properly link all of 
those as "related" so it shows up on top? It gets lost easily in comments. 
Still unclear to me, to what extent we can fix it though...

> NullPointerException on out-of-order topologies
> ---
>
> Key: KAFKA-9518
> URL: https://issues.apache.org/jira/browse/KAFKA-9518
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1, 2.4.0, 2.3.1
>Reporter: Murilo Tavares
>Priority: Minor
> Attachments: kafka-streams-testing.zip
>
>
> I have a KafkaStreams that dinamically builds a topology based on a Map of 
> input-to-output topics. Since the map was not sorted, iteration was 
> unpredictable, and different instances could have different orders. When this 
> happen, KafkaStreams throws an exception during REBALANCE.
>  
> I was able to reproduce this using the attached java project. The project is 
> a pretty simple Maven project with one class. It starts 2 instances in 
> parallel, with the same input-to-output topics, but one instance takes the 
> topics in a reversed order.
>  
> The exception is this:
> {noformat}
> Exception in thread 
> "MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: stream-thread 
> [MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1] Failed to 
> rebalance.
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:234)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:176)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
> ... 3 more{noformat}
>  
> The topology for both instances:
> {code:java}
> // instance1
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-00 (topics: [topicA])
>       --> KSTREAM-SINK-01
>     Sink: KSTREAM-SINK-01 (topic: topicA-repartitioned)
>       <-- KSTREAM-SOURCE-00
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-02 (topics: [topicB])
>       --> KSTREAM-SINK-03
>     Sink: KSTREAM-SINK-03 (topic: topicB-repartitioned)
>       <-- KSTREAM-SOURCE-02
> // instance2
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-00 (topics: [topicB])
>       --> KSTREAM-SINK-01
>     Sink: KSTREAM-SINK-01 (topic: topicB-repartitioned)
>       <-- KSTREAM-SOURCE-00
>   Sub-topology: 1
>     Source: KSTREAM-SOURCE-02 (topics: [topicA])
>       --> KSTREAM-SINK-03
>     Sink: KSTREAM-SINK-03 (topic: 

[jira] [Commented] (KAFKA-9498) Topic validation during the creation trigger unnecessary TopicChange events

2020-02-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032654#comment-17032654
 ] 

ASF GitHub Bot commented on KAFKA-9498:
---

dajac commented on pull request #8062: KAFKA-9498; Topic validation during the 
creation trigger unnecessary TopicChange events
URL: https://github.com/apache/kafka/pull/8062
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Topic validation during the creation trigger unnecessary TopicChange events 
> 
>
> Key: KAFKA-9498
> URL: https://issues.apache.org/jira/browse/KAFKA-9498
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> I have found out that the topic validation logic, which is executed when 
> CreateTopicPolicy or when validateOnly is set, triggers unnecessary 
> ChangeTopic events in the controller. In the worst case, it can trigger up to 
> one event per created topic and leads to overloading the controller.
> This happens because the validation logic reads all the topics from ZK using 
> the method getAllTopicsInCluster provided by the KafkaZKClient. This method 
> registers a watch every time the topics are read from Zookeeper.
> I think that we should make the watch registration optional for this call in 
> oder to avoid this unwanted behaviour.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9248) Foreign key join does not pickup default serdes and dies with NPE

2020-02-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032632#comment-17032632
 ] 

John Roesler commented on KAFKA-9248:
-

Hey [~mjsax], I just saw this ticket. I think it's the same as 
https://issues.apache.org/jira/browse/KAFKA-9517,  do you agree?

If so, I'd close this one as a duplicate, since 9517 already has a PR.

> Foreign key join does not pickup default serdes and dies with NPE
> -
>
> Key: KAFKA-9248
> URL: https://issues.apache.org/jira/browse/KAFKA-9248
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> The foreign key join operator only works if `Serdes` are passed in by the 
> user via corresponding API methods.
> If one tries to fall back to default Serdes from `StreamsConfig` the operator 
> does not pick up those Serdes but dies with a NPE.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On

2020-02-07 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-9517:
---

Assignee: John Roesler

> KTable Joins Without Materialized Argument Yield Results That Further Joins 
> NPE On
> --
>
> Key: KAFKA-9517
> URL: https://issues.apache.org/jira/browse/KAFKA-9517
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Paul Snively
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.5.0, 2.4.1
>
>
> The `KTable` API implemented [[here||#L842-L844]] 
> [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844]
>  []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of 
> `Materialized.with(null, null)`, as apparently do several other APIs. As the 
> comment spanning [these lines|#L1098-L1099]] makes clear, the result is a 
> `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, 
> attempts to `join` etc. on the resulting `KTable` fail with a 
> `NullPointerException`.
> While there is an obvious workaround—explicitly construct the required 
> `Materialized` and use the APIs that take it as an argument—I have to admit I 
> find the existence of public APIs with this sort of bug, particularly when 
> the bug is literally documented as a comment in the source code, astonishing 
> to the point of incredulity. It calls the quality and trustworthiness of 
> Kafka Streams into serious question, and if a resolution is not forthcoming 
> within a week, we will be left with no other option but to consider technical 
> alternatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On

2020-02-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032630#comment-17032630
 ] 

John Roesler commented on KAFKA-9517:
-

Hi [~psnively], thanks for your kind words.

Actually, the 2.4.1 release is currently in progress. Fortunately, we caught 
these issues early enough to have them included as blockers for the release. 
The release plan is here: 
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+2.4.1
Of course, it cannot progress until the blockers are resolved, which is the 
biggest wild-card in the timeline. As soon as the blockers are cleared, Bill 
(who volunteered to drive the release) will be able to give a better estimate 
about the timeline.

As you said, the biggest thing you can do to help is to check out the PRs and 
test them. I think you'll need both #8015 and #8061. As per Apache Kafka 
policies, the PRs are actually based on trunk, so you'll want to squash them 
and cherry-pick them onto 2.4 to get an accurate proxy for 2.4.1 . This is 
actually a huge help, since even extensive testing can have subtle but 
important gaps (which is how we wound up with these bugs to begin with).

The other bug thing you can do if you have time is review the PRs. You've 
already become familiar enough with the code to identify the root cause even 
before I saw it, and a fresh perspective is always helpful.

Thanks again,
-John

> KTable Joins Without Materialized Argument Yield Results That Further Joins 
> NPE On
> --
>
> Key: KAFKA-9517
> URL: https://issues.apache.org/jira/browse/KAFKA-9517
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Paul Snively
>Priority: Blocker
> Fix For: 2.5.0, 2.4.1
>
>
> The `KTable` API implemented [[here||#L842-L844]] 
> [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844]
>  []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of 
> `Materialized.with(null, null)`, as apparently do several other APIs. As the 
> comment spanning [these lines|#L1098-L1099]] makes clear, the result is a 
> `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, 
> attempts to `join` etc. on the resulting `KTable` fail with a 
> `NullPointerException`.
> While there is an obvious workaround—explicitly construct the required 
> `Materialized` and use the APIs that take it as an argument—I have to admit I 
> find the existence of public APIs with this sort of bug, particularly when 
> the bug is literally documented as a comment in the source code, astonishing 
> to the point of incredulity. It calls the quality and trustworthiness of 
> Kafka Streams into serious question, and if a resolution is not forthcoming 
> within a week, we will be left with no other option but to consider technical 
> alternatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2020-02-07 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032625#comment-17032625
 ] 

Randall Hauch commented on KAFKA-7052:
--

Any change to an API requires a KIP, even if it's a backward compatible change 
(which most are).
{quote}"fail" for records with a schema (it's raising an 
{{IllegalArgumentException}} but could be NPE of course if the exact exception 
type is a concern; I think it shouldn't, as the original NPE really is a 
weakness of the existing implementation that shouldn't be relied upon
{quote}
I'm not sure that the type of exception matters, and IAE would be more clear.

> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On

2020-02-07 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9517:
---
Fix Version/s: 2.4.1
   2.5.0

> KTable Joins Without Materialized Argument Yield Results That Further Joins 
> NPE On
> --
>
> Key: KAFKA-9517
> URL: https://issues.apache.org/jira/browse/KAFKA-9517
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Paul Snively
>Priority: Critical
> Fix For: 2.5.0, 2.4.1
>
>
> The `KTable` API implemented [[here||#L842-L844]] 
> [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844]
>  []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of 
> `Materialized.with(null, null)`, as apparently do several other APIs. As the 
> comment spanning [these lines|#L1098-L1099]] makes clear, the result is a 
> `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, 
> attempts to `join` etc. on the resulting `KTable` fail with a 
> `NullPointerException`.
> While there is an obvious workaround—explicitly construct the required 
> `Materialized` and use the APIs that take it as an argument—I have to admit I 
> find the existence of public APIs with this sort of bug, particularly when 
> the bug is literally documented as a comment in the source code, astonishing 
> to the point of incredulity. It calls the quality and trustworthiness of 
> Kafka Streams into serious question, and if a resolution is not forthcoming 
> within a week, we will be left with no other option but to consider technical 
> alternatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On

2020-02-07 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-9517:
---
Priority: Blocker  (was: Critical)

> KTable Joins Without Materialized Argument Yield Results That Further Joins 
> NPE On
> --
>
> Key: KAFKA-9517
> URL: https://issues.apache.org/jira/browse/KAFKA-9517
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Paul Snively
>Priority: Blocker
> Fix For: 2.5.0, 2.4.1
>
>
> The `KTable` API implemented [[here||#L842-L844]] 
> [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844]
>  []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of 
> `Materialized.with(null, null)`, as apparently do several other APIs. As the 
> comment spanning [these lines|#L1098-L1099]] makes clear, the result is a 
> `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, 
> attempts to `join` etc. on the resulting `KTable` fail with a 
> `NullPointerException`.
> While there is an obvious workaround—explicitly construct the required 
> `Materialized` and use the APIs that take it as an argument—I have to admit I 
> find the existence of public APIs with this sort of bug, particularly when 
> the bug is literally documented as a comment in the source code, astonishing 
> to the point of incredulity. It calls the quality and trustworthiness of 
> Kafka Streams into serious question, and if a resolution is not forthcoming 
> within a week, we will be left with no other option but to consider technical 
> alternatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On

2020-02-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032616#comment-17032616
 ] 

ASF GitHub Bot commented on KAFKA-9517:
---

vvcephei commented on pull request #8061: KAFKA-9517: Fix default serdes with 
FK join
URL: https://github.com/apache/kafka/pull/8061
 
 
   During the KIP-213 implementation and verification, we neglected to test the 
code path for falling back to default serdes if none are given in the topology.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KTable Joins Without Materialized Argument Yield Results That Further Joins 
> NPE On
> --
>
> Key: KAFKA-9517
> URL: https://issues.apache.org/jira/browse/KAFKA-9517
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Paul Snively
>Priority: Critical
>
> The `KTable` API implemented [[here||#L842-L844]] 
> [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844]
>  []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of 
> `Materialized.with(null, null)`, as apparently do several other APIs. As the 
> comment spanning [these lines|#L1098-L1099]] makes clear, the result is a 
> `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, 
> attempts to `join` etc. on the resulting `KTable` fail with a 
> `NullPointerException`.
> While there is an obvious workaround—explicitly construct the required 
> `Materialized` and use the APIs that take it as an argument—I have to admit I 
> find the existence of public APIs with this sort of bug, particularly when 
> the bug is literally documented as a comment in the source code, astonishing 
> to the point of incredulity. It calls the quality and trustworthiness of 
> Kafka Streams into serious question, and if a resolution is not forthcoming 
> within a week, we will be left with no other option but to consider technical 
> alternatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032604#comment-17032604
 ] 

Evan Williams commented on KAFKA-4084:
--

[~sql_consulting]

That would be great! We are running the latest Confluent 5.4. However Zookeeper 
is whatever is bundled with 5.3 currently.


Another option I was thinking about, is separating client, replication and 
controller traffic via multiple NIC's. At the moment, we just have one 
listener/advertised listener and one NIC per broker. I guess this would also 
guarantee that even if threads get exhausted for replication traffic, that 
client traffic would be unaffected ?

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032598#comment-17032598
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror] 

Which kafka version you are running?   Maybe I can provide a diff for the 
KIP-491 changes  for you to apply at your end to try it out? 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams

2020-02-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032577#comment-17032577
 ] 

ASF GitHub Bot commented on KAFKA-9274:
---

guozhangwang commented on pull request #8060: KAFKA-9274: Gracefully handle 
timeout exception [WIP]
URL: https://github.com/apache/kafka/pull/8060
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Gracefully handle timeout exceptions on Kafka Streams
> -
>
> Key: KAFKA-9274
> URL: https://issues.apache.org/jira/browse/KAFKA-9274
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> Right now streams don't treat timeout exception as retriable in general by 
> throwing it to the application level. If not handled by the user, this would 
> kill the stream thread unfortunately.
> In fact, timeouts happen mostly due to network issue or server side 
> unavailability. Hard failure on client seems to be an over-kill.
> We would like to discuss what's the best practice to handle timeout 
> exceptions on Streams. The current state is still brainstorming and 
> consolidate all the cases that contain timeout exception within this ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2020-02-07 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032552#comment-17032552
 ] 

Gunnar Morling commented on KAFKA-7052:
---

Thanks for commenting, [~rhauch]! My intention was to actually keep the current 
behavior by means of the default setting of the new option:

* "fail" for records with a schema (it's raising an 
{{IllegalArgumentException}} but could be NPE of course if the exact exception 
type is a concern; I think it shouldn't, as the original NPE really is a 
weakness of the existing implementation that shouldn't be relied upon
* "return-null" for records without schema

I.e. without any explicit setting, the behavior will be exactly be the same as 
today (ignoring the changed exception type). That's why I didn't assume that'd 
need a KIP, but as per what you're saying, any new option mandates a KIP?

> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8307) Kafka Streams should provide some mechanism to determine topology equality and compatibility

2020-02-07 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032525#comment-17032525
 ] 

Guozhang Wang commented on KAFKA-8307:
--

Thanks for the summary [~vvcephei]. I agree with you that having a verification 
mechanism that two topologies are compatible or equal. Besides that, though, 
I'm thinking that Streams itself should be robust to the user code in 
determining the ordering of the operators (and more importantly, the naming 
suffix of the operators) -- since we now have an internal logical 
representation of the topology before generating the physical Topology, we 
should make the generation process to be somehow "deterministic" such that no 
matter you write:

stream1.join(stream2)
stream1.groupBy().aggregate()

OR

stream1.groupBy().aggregate()
stream1.join(stream2)

The generated topology would be the same in terms of the operator ordering 
(today they would be different).


> Kafka Streams should provide some mechanism to determine topology equality 
> and compatibility
> 
>
> Key: KAFKA-8307
> URL: https://issues.apache.org/jira/browse/KAFKA-8307
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: user-experience
>
> Currently, Streams provides no mechanism to compare two topologies. This is a 
> common operation when users want to have tests verifying that they don't 
> accidentally alter their topology. They would save the known-good topology 
> and then add a unit test verifying the current code against that known-good 
> state.
> However, because there's no way to do this comparison properly, everyone is 
> reduced to using the string format of the topology (from 
> `Topology#describe().toString()`). The major drawback is that the string 
> format is meant for human consumption. It is neither machine-parseable nor 
> stable. So, these compatibility tests are doomed to fail when any minor, 
> non-breaking, change is made either to the application, or to the library. 
> This trains everyone to update the test whenever it fails, undermining its 
> utility.
> We should fix this problem, and provide both a mechanism to serialize the 
> topology and to compare two topologies for compatibility. All in all, I think 
> we need:
> # a way to serialize/deserialize topology structure in a machine-parseable 
> format that is future-compatible. Offhand, I'd recommend serializing the 
> topology structure as JSON, and establishing a policy that attributes should 
> only be added to the object graph, never removed. Note, it's out of scope to 
> be able to actually run a deserialized topology; we only want to save and 
> load the structure (not the logic) to facilitate comparisons.
> # a method to verify the *equality* of two topologies... This method tells 
> you that the two topologies are structurally identical. We can't know if the 
> logic of any operator has changed, only if the structure of the graph is 
> changed. We can consider whether other graph properties, like serdes, should 
> be included.
> # a method to verify the *compatibility* of two topologies... This method 
> tells you that moving from topology A to topology B does not require an 
> application reset. Note that this operation is not commutative: 
> `A.compatibleWith(B)` does not imply `B.compatibleWith(A)`. We can discuss 
> whether `A.compatibleWith(B) && B.compatibleWith(A)` implies `A.equals(B)` (I 
> think not necessarily, because we may want "equality" to be stricter than 
> "compatibility").



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On

2020-02-07 Thread Paul Snively (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032520#comment-17032520
 ] 

Paul Snively commented on KAFKA-9517:
-

Speaking of things I can do: I am downloading PR #8015 as a patch, and will 
apply it locally, build the appropriate `.jar`s, and we will attempt to 
reproduce the issues we've seen given that PR, and report back.

> KTable Joins Without Materialized Argument Yield Results That Further Joins 
> NPE On
> --
>
> Key: KAFKA-9517
> URL: https://issues.apache.org/jira/browse/KAFKA-9517
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Paul Snively
>Priority: Critical
>
> The `KTable` API implemented [[here||#L842-L844]] 
> [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844]
>  []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of 
> `Materialized.with(null, null)`, as apparently do several other APIs. As the 
> comment spanning [these lines|#L1098-L1099]] makes clear, the result is a 
> `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, 
> attempts to `join` etc. on the resulting `KTable` fail with a 
> `NullPointerException`.
> While there is an obvious workaround—explicitly construct the required 
> `Materialized` and use the APIs that take it as an argument—I have to admit I 
> find the existence of public APIs with this sort of bug, particularly when 
> the bug is literally documented as a comment in the source code, astonishing 
> to the point of incredulity. It calls the quality and trustworthiness of 
> Kafka Streams into serious question, and if a resolution is not forthcoming 
> within a week, we will be left with no other option but to consider technical 
> alternatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2020-02-07 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032514#comment-17032514
 ] 

Randall Hauch commented on KAFKA-7052:
--

On second though, I'm not sure we can change the behavior without risking 
backward-incompatible changes.

First, the current behavior when the SMT can't be applied is to fail with an 
NPE. We try to use meaningful exceptions with useful error messages, and the 
lack of any code to check for this situation likely means this case was simply 
not considered and throwing an NPE is unintentional. This is an argument for 
changing the behavior to skip any record for which the specified field is not 
found.

Second, it's probably not practical for users to rely upon this existing NPE 
behavior when used with a +source+ connector, since that would leave the 
connector in a failed state without advancing offsets. Essentially, the 
connector would be stuck and unable to continue unless the configuration is 
changed.

However, if someone were to use this SMT with a +sink+ connector and use the 
DLQ functionality, they might be relying upon the NPE to signal that the record 
should go to the DLQ. If we were to change the behavior, the record would no 
longer go to the DLQ and instead would get sent to any subsequent 
transformation and ultimately to the sink connector.

Therefore, changing to have the SMT skip any record for which the field is not 
found is technically not a backward compatible change, and this would require a 
KIP or, better yet, require introducing a new configuration property (as 
mentioned in my previous comment) that would default to "fail" to maintain 
backward compatibility.

Thoughts?

 

> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8810) Add mechanism to detect topology mismatch between streams instances

2020-02-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032509#comment-17032509
 ] 

John Roesler commented on KAFKA-8810:
-

This issue has been lurking for a while, and has been reported a number of 
different ways. It seems to take two forms:
1. changing the topology at all (in apparently compatible ways) can renumber 
operators and corrupt the application upon restart
2. changing the topology in combination with a rolling bounce results in 
members executing a different topology than the leader, which leads to extra 
problems (such as NPEs)

https://issues.apache.org/jira/browse/KAFKA-7669 is related, and seems to be 
more about just changing the topology at all
https://issues.apache.org/jira/browse/KAFKA-8307 proposes a fix
https://issues.apache.org/jira/browse/KAFKA-8810 seems to be a duplicate of 
KAFKA-8307
and https://issues.apache.org/jira/browse/KAFKA-9518 reports an exception that 
results from a rolling-bounce topology change.

> Add mechanism to detect topology mismatch between streams instances
> ---
>
> Key: KAFKA-8810
> URL: https://issues.apache.org/jira/browse/KAFKA-8810
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Vinoth Chandar
>Priority: Major
>
> Noticed this while reading through the StreamsPartitionAssignor related code. 
> If an user accidentally deploys a different topology on one of the instances, 
> there is no mechanism to detect this and refuse assignment/take action. Given 
> Kafka Streams is designed as an embeddable library, I feel this is rather an 
> important scenario to handle. For e.g, kafka streams is embedded into a web 
> front end tier and operators deploy a hot fix for a site issue to a few 
> instances that are leaking memory and that accidentally also deploys some 
> topology changes with it. 
> Please feel free to close the issue, if its a duplicate. (Could not find a 
> ticket for this) 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8307) Kafka Streams should provide some mechanism to determine topology equality and compatibility

2020-02-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032511#comment-17032511
 ] 

John Roesler commented on KAFKA-8307:
-

This issue has been lurking for a while, and has been reported a number of 
different ways. It seems to take two forms:
1. changing the topology at all (in apparently compatible ways) can renumber 
operators and corrupt the application upon restart
2. changing the topology in combination with a rolling bounce results in 
members executing a different topology than the leader, which leads to extra 
problems (such as NPEs)

https://issues.apache.org/jira/browse/KAFKA-7669 is related, and seems to be 
more about just changing the topology at all
https://issues.apache.org/jira/browse/KAFKA-8307 proposes a fix
https://issues.apache.org/jira/browse/KAFKA-8810 seems to be a duplicate of 
KAFKA-8307
and https://issues.apache.org/jira/browse/KAFKA-9518 reports an exception that 
results from a rolling-bounce topology change.

> Kafka Streams should provide some mechanism to determine topology equality 
> and compatibility
> 
>
> Key: KAFKA-8307
> URL: https://issues.apache.org/jira/browse/KAFKA-8307
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: user-experience
>
> Currently, Streams provides no mechanism to compare two topologies. This is a 
> common operation when users want to have tests verifying that they don't 
> accidentally alter their topology. They would save the known-good topology 
> and then add a unit test verifying the current code against that known-good 
> state.
> However, because there's no way to do this comparison properly, everyone is 
> reduced to using the string format of the topology (from 
> `Topology#describe().toString()`). The major drawback is that the string 
> format is meant for human consumption. It is neither machine-parseable nor 
> stable. So, these compatibility tests are doomed to fail when any minor, 
> non-breaking, change is made either to the application, or to the library. 
> This trains everyone to update the test whenever it fails, undermining its 
> utility.
> We should fix this problem, and provide both a mechanism to serialize the 
> topology and to compare two topologies for compatibility. All in all, I think 
> we need:
> # a way to serialize/deserialize topology structure in a machine-parseable 
> format that is future-compatible. Offhand, I'd recommend serializing the 
> topology structure as JSON, and establishing a policy that attributes should 
> only be added to the object graph, never removed. Note, it's out of scope to 
> be able to actually run a deserialized topology; we only want to save and 
> load the structure (not the logic) to facilitate comparisons.
> # a method to verify the *equality* of two topologies... This method tells 
> you that the two topologies are structurally identical. We can't know if the 
> logic of any operator has changed, only if the structure of the graph is 
> changed. We can consider whether other graph properties, like serdes, should 
> be included.
> # a method to verify the *compatibility* of two topologies... This method 
> tells you that moving from topology A to topology B does not require an 
> application reset. Note that this operation is not commutative: 
> `A.compatibleWith(B)` does not imply `B.compatibleWith(A)`. We can discuss 
> whether `A.compatibleWith(B) && B.compatibleWith(A)` implies `A.equals(B)` (I 
> think not necessarily, because we may want "equality" to be stricter than 
> "compatibility").



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9518) NullPointerException on out-of-order topologies

2020-02-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032508#comment-17032508
 ] 

John Roesler commented on KAFKA-9518:
-

This issue has been lurking for a while, and has been reported a number of 
different ways. It seems to take two forms:
1. changing the topology at all (in apparently compatible ways) can renumber 
operators and corrupt the application upon restart
2. changing the topology in combination with a rolling bounce results in 
members executing a different topology than the leader, which leads to extra 
problems (such as NPEs)

https://issues.apache.org/jira/browse/KAFKA-7669 is related, and seems to be 
more about just changing the topology at all
https://issues.apache.org/jira/browse/KAFKA-8307 proposes a fix
https://issues.apache.org/jira/browse/KAFKA-8810 seems to be a duplicate of 
KAFKA-8307
and https://issues.apache.org/jira/browse/KAFKA-9518 reports an exception that 
results from a rolling-bounce topology change.


> NullPointerException on out-of-order topologies
> ---
>
> Key: KAFKA-9518
> URL: https://issues.apache.org/jira/browse/KAFKA-9518
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1, 2.4.0, 2.3.1
>Reporter: Murilo Tavares
>Priority: Minor
> Attachments: kafka-streams-testing.zip
>
>
> I have a KafkaStreams that dinamically builds a topology based on a Map of 
> input-to-output topics. Since the map was not sorted, iteration was 
> unpredictable, and different instances could have different orders. When this 
> happen, KafkaStreams throws an exception during REBALANCE.
>  
> I was able to reproduce this using the attached java project. The project is 
> a pretty simple Maven project with one class. It starts 2 instances in 
> parallel, with the same input-to-output topics, but one instance takes the 
> topics in a reversed order.
>  
> The exception is this:
> {noformat}
> Exception in thread 
> "MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: stream-thread 
> [MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1] Failed to 
> rebalance.
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> Caused by: java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:234)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:176)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
> ... 3 more{noformat}
>  
> The topology for both instances:
> {code:java}
> // instance1
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-00 (topics: [topicA])
>       --> KSTREAM-SINK-01
>     Sink: KSTREAM-SINK-01 (topic: topicA-repartitioned)
>       <-- KSTREAM-SOURCE-00
>   Sub-topology: 1
>  

[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes

2020-02-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032512#comment-17032512
 ] 

John Roesler commented on KAFKA-7669:
-

This issue has been lurking for a while, and has been reported a number of 
different ways. It seems to take two forms:
1. changing the topology at all (in apparently compatible ways) can renumber 
operators and corrupt the application upon restart
2. changing the topology in combination with a rolling bounce results in 
members executing a different topology than the leader, which leads to extra 
problems (such as NPEs)

https://issues.apache.org/jira/browse/KAFKA-7669 is related, and seems to be 
more about just changing the topology at all
https://issues.apache.org/jira/browse/KAFKA-8307 proposes a fix
https://issues.apache.org/jira/browse/KAFKA-8810 seems to be a duplicate of 
KAFKA-8307
and https://issues.apache.org/jira/browse/KAFKA-9518 reports an exception that 
results from a rolling-bounce topology change.

> Stream topology definition is not robust to the ordering changes
> 
>
> Key: KAFKA-7669
> URL: https://issues.apache.org/jira/browse/KAFKA-7669
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> It seems that if the user does not guarantee the order of the stream topology 
> definition, he may end up with multiple stream branches having the same 
> internal changelog (and repartition, if created) topic. 
> Let's assume:
> {code:java}
> val initialStream = new StreamsBuilder().stream(sth);
> val someStrings = (1 to 10).map(_.toString)
> val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = 
> someStrings.map(s => s -> initialStream.filter(...)).toMap{code}
> When the user defines now common aggregation logic for the 
> notGuaranteedOrderOfStreams, and runs multiple instances of the application 
> the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will 
> contain results of the different streams from notGuaranteedOrderOfStreams map.
> All of this without a single warning that the topology (or just the order of 
> the topology definition) differs in different instances of the Kafka Streams 
> application.
> Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog 
> " match so well for the different application instances (and different 
> topologies).
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2020-02-07 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032488#comment-17032488
 ] 

Randall Hauch commented on KAFKA-7052:
--

Thanks for the discussion, everyone. As soon as we introduce a configuration 
option on a Connect-provided SMT, we need a KIP and we can't backport the 
change.

I would suggestion the following:
 # One PR to improve the error message per [~rmoff]'s earlier comment (e.g., 
"ExtractField : field 'id' not found in Key") but otherwise not change the 
behavior. Or, we make the message better and we decide that "skip" is an 
appropriate fix rather than an NPE; this may need more discussion, but I think 
NPE is a bad UX and probably rarely useful. This would be able to be merged on 
trunk and backported to 2-3 branches (standard practice).
 # Create a new Jira issue and small KIP and another PR to add the proposed 
ExtractField configuration property, such as 
"{{behavior.on.non.existant.field}} = {{(fail|drop|skip)}}". Note I'm 
suggesting using "skip" rather than "passon" to keep things simple. 
[~gunnar.morling]'s PR would apply to this new Jira issue and KIP rather than 
to this issue.
 # Create a new Jira issue and KIP to define an optional topic filter property 
for each transformation that would specify a regex pattern matching the topics 
to which the SMT should apply, and defaulting to ".*" to maintain the current 
behavior. I'm not sure how feasible this will be to ensure it never clashes 
with SMT-specific properties, but it's worth investigating. 

Important: We've already missed the KIP acceptance deadline for AK 2.5, so the 
earliest #2 and #3 could appear is AK 2.6.

WDYT?

> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9177) Pause completed partitions on restore consumer

2020-02-07 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9177.
--
Fix Version/s: 2.6.0
 Assignee: Guozhang Wang
   Resolution: Fixed

As part of KAFKA-9113 fix, we will pause the restore consumer once the 
corresponding partition has completed restoration, so I'm resolving this ticket 
now.

> Pause completed partitions on restore consumer
> --
>
> Key: KAFKA-9177
> URL: https://issues.apache.org/jira/browse/KAFKA-9177
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> The StoreChangelogReader is responsible for tracking and restoring active 
> tasks, but once a store has finished restoring it will continue polling for 
> records on that partition.
> Ordinarily this doesn't make a difference as a store is not completely 
> restored until its entire changelog has been read, so there are no more 
> records for poll to return anyway. But if the restoring state is actually an 
> optimized source KTable, the changelog is just the source topic and poll will 
> keep returning records for that partition until all stores have been restored.
> Note that this isn't a correctness issue since it's just the restore 
> consumer, but it is wasteful to be polling for records and throwing them 
> away. We should pause completed partitions in StoreChangelogReader so we 
> don't slow down the restore consumer in reading from the unfinished changelog 
> topics, and avoid wasted network.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On

2020-02-07 Thread Paul Snively (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032426#comment-17032426
 ] 

Paul Snively commented on KAFKA-9517:
-

[~vvcephei], that's great news! First, let me thank you again for your prompt 
attention. It's done a great deal to restore my confidence in Kafka Streams. 
Second, and I hate to ask because I know it's a big project with many other 
customers with various needs, but do you happen to have some idea when a 2.4.1 
might be available with fixes for these, and is there anything I can do to help 
with the process?

Thanks again!

> KTable Joins Without Materialized Argument Yield Results That Further Joins 
> NPE On
> --
>
> Key: KAFKA-9517
> URL: https://issues.apache.org/jira/browse/KAFKA-9517
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Paul Snively
>Priority: Critical
>
> The `KTable` API implemented [[here||#L842-L844]] 
> [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844]
>  []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of 
> `Materialized.with(null, null)`, as apparently do several other APIs. As the 
> comment spanning [these lines|#L1098-L1099]] makes clear, the result is a 
> `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, 
> attempts to `join` etc. on the resulting `KTable` fail with a 
> `NullPointerException`.
> While there is an obvious workaround—explicitly construct the required 
> `Materialized` and use the APIs that take it as an argument—I have to admit I 
> find the existence of public APIs with this sort of bug, particularly when 
> the bug is literally documented as a comment in the source code, astonishing 
> to the point of incredulity. It calls the quality and trustworthiness of 
> Kafka Streams into serious question, and if a resolution is not forthcoming 
> within a week, we will be left with no other option but to consider technical 
> alternatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032285#comment-17032285
 ] 

Evan Williams edited comment on KAFKA-4084 at 2/7/20 1:07 PM:
--

[~sql_consulting]

We are using min.insync.replicas=1.  And have replication.factor=3 or above for 
most topics (6 brokers).
 As a side note, one interesting thing I've seen reported now from the owners 
of the clients (streams) is that, for certain topics/partitions - they had no 
leader elected, even if there was a clean shutdown of the bootstrapping broker. 
So something is quite weird there. What might cause that ?

 

But yes, I think there is a clear case for KIP-491 in this scenario, to just 
blacklist a broker from becoming leader until x factor is satisfied, or it's 
manually removed.

 


was (Author: blodsbror):
[~sql_consulting]

We are using min.insync.replicas=1.  And have replication.factor=3 or above for 
most topics (6 brokers).
 As a side note, one interesting thing I've seen reported now from the owners 
of the clients (streams) is that, for certain topics/partitions - they had no 
leader elected, even if there was a clean shutdown of the bootstrapping broker. 
So something is quite weird there. What might cause that ?

 

But yes, I think there is a clear case for KIP-491 in this scenario, of URP to 
just blacklist a broker from becoming leader until x factor is satisfied, or 
it's manually removed.

 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9522) kafka-connect failed when the topic can not be accessed

2020-02-07 Thread Deblock (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deblock updated KAFKA-9522:
---
Description: 
The kafka-connect fail if the topic can not be join (permission issue or topic 
doesn't exists).

 

This issue happend using Debezium CDC : 
[https://issues.redhat.com/browse/DBZ-1770]

 

The topic can be choosen using a column on the database. If the topic on a 
database contains an issue (permission issue or topic doesn't exists), the 
connect stop to work, and new event will not be sent.

 

The exception is thrown  on `WorkerSourceTask` by method 
`maybeThrowProducerSendException`. Maybe add a parameter to not fail on 
exception. 

 

  was:
The kafka-connect fail if the topic can not be join (permission issue or topic 
doesn't exists).

 

This issue happend using Debezium CDC : 
[https://issues.redhat.com/browse/DBZ-1770]

 

The topic can be choosen using a column on the database. If the topic on a 
database contains an issue (permission issue or topic doesn't exists), the 
connect stop to work, and new event will not be sent.

 

 


> kafka-connect failed when the topic can not be accessed
> ---
>
> Key: KAFKA-9522
> URL: https://issues.apache.org/jira/browse/KAFKA-9522
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.3.1
>Reporter: Deblock
>Priority: Major
>
> The kafka-connect fail if the topic can not be join (permission issue or 
> topic doesn't exists).
>  
> This issue happend using Debezium CDC : 
> [https://issues.redhat.com/browse/DBZ-1770]
>  
> The topic can be choosen using a column on the database. If the topic on a 
> database contains an issue (permission issue or topic doesn't exists), the 
> connect stop to work, and new event will not be sent.
>  
> The exception is thrown  on `WorkerSourceTask` by method 
> `maybeThrowProducerSendException`. Maybe add a parameter to not fail on 
> exception. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9522) kafka-connect failed when the topic can not be accessed

2020-02-07 Thread Deblock (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Deblock updated KAFKA-9522:
---
Description: 
The kafka-connect fail if the topic can not be join (permission issue or topic 
doesn't exists).

 

This issue happend using Debezium CDC : 
[https://issues.redhat.com/browse/DBZ-1770]

 

The topic can be choosen using a column on the database. If the topic on a 
database contains an issue (permission issue or topic doesn't exists), the 
connect stop to work, and new event will not be sent.

 

 

  was:
The kafka-connect fail if the topic can not be join (permission issue or topic 
doesn't exists).

 

This issue happend using Debezium CDC : 
[https://issues.redhat.com/browse/DBZ-1770]

 

The topic can be choosen using a column on the database. If the topic on a 
database contains an issue (permission issue or topic doesn't exists), the 
connect stop to work, and new event will not be sent.

 


> kafka-connect failed when the topic can not be accessed
> ---
>
> Key: KAFKA-9522
> URL: https://issues.apache.org/jira/browse/KAFKA-9522
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.3.1
>Reporter: Deblock
>Priority: Major
>
> The kafka-connect fail if the topic can not be join (permission issue or 
> topic doesn't exists).
>  
> This issue happend using Debezium CDC : 
> [https://issues.redhat.com/browse/DBZ-1770]
>  
> The topic can be choosen using a column on the database. If the topic on a 
> database contains an issue (permission issue or topic doesn't exists), the 
> connect stop to work, and new event will not be sent.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032285#comment-17032285
 ] 

Evan Williams edited comment on KAFKA-4084 at 2/7/20 11:58 AM:
---

[~sql_consulting]

We are using min.insync.replicas=1.  And have replication.factor=3 or above for 
most topics (6 brokers).
 As a side note, one interesting thing I've seen reported now from the owners 
of the clients (streams) is that, for certain topics/partitions - they had no 
leader elected, even if there was a clean shutdown of the bootstrapping broker. 
So something is quite weird there. What might cause that ?

 

But yes, I think there is a clear case for KIP-491 in this scenario, of URP to 
just blacklist a broker from becoming leader until x factor is satisfied, or 
it's manually removed.

 


was (Author: blodsbror):
[~sql_consulting]

We are using min.insync.replicas=1.  And have replication.factor=3 or above for 
most topics (6 brokers).
 As a side note, one interesting thing I've seen reported now from the owners 
of the clients (streams) is that, for certain topics/partitions - they had no 
leader, even if there was a clean shutdown of the bootstrapping broker. So 
something is quite weird there. What might cause that ?

 Topic was marked for deletion, without any user requesting this:

Topic: data.vehicle-topic.journey-dpi PartitionCount: 6 ReplicationFactor: 3 
Configs: cleanup.policy=delete MarkedForDeletion: true
 Topic: topic.name: 0 Leader: none Replicas: 54,52,53 Isr: 54 
MarkedForDeletion: true
 Topic: topic.name: 1 Leader: none Replicas: 82,53,54 Isr: 82 
MarkedForDeletion: true
 Topic: topic.name: 2 Leader: none Replicas: 83,54,82 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 3 Leader: none Replicas: 84,82,83 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 4 Leader: none Replicas: 52,83,84 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 5 Leader: none Replicas: 53,84,52 Isr: 53 
MarkedForDeletion: true

But yes, I think there is a clear case for KIP-491 in this scenario, of URP to 
just blacklist a broker from becoming leader until x factor is satisfied, or 
it's manually removed.

 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message

2020-02-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032325#comment-17032325
 ] 

ASF GitHub Bot commented on KAFKA-7052:
---

gunnarmorling commented on pull request #8059: KAFKA-7052 Adding option to 
ExtractField SMT for controlling behavior…
URL: https://github.com/apache/kafka/pull/8059
 
 
   … in case of non-existent fields
   
   https://issues.apache.org/jira/browse/KAFKA-7052
   
   *More detailed description of your change
   n/a
   
   *Summary of testing strategy (including rationale)
   Added JUnit test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
   @rhauch, hey, that's a quick first attempt for making the `ExtractField` SMT 
more flexible when it comes to encountering a record that doesn't contain the 
specified field. It's a common situation for connectors like Debezium which 
produce different "kinds" of records/topics; in our case e.g. actual change 
event topics and meta-topics such as TX data or schema history. One might want 
to apply the `ExtractField` SMT to the CDC records but not to those others. 
Also see [KAFKA-7052](https://issues.apache.org/jira/browse/KAFKA-7052) for 
some backgrounds.
   
   The proposal is to add a new option `behavior.on.non.existent.field` to the 
SMT which makes the behavior configurable. Its supported values are:
   
   * fail: raise an exception (default for records with schema)
   * return-null: return null (default for records without schema)
   * pass-on: pass on the unmodified original record
   
   I did a quick implementation of that proposal to foster feedback. Happy to 
adjust and expand as needed, e.g. to adjust with existing naming patterns for 
the option and/or its values as well as docs (not sure where that'd go). Thanks!
   
   CC @rmoff, @big-andy-coates.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ExtractField SMT throws NPE - needs clearer error message
> -
>
> Key: KAFKA-7052
> URL: https://issues.apache.org/jira/browse/KAFKA-7052
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>
> With the following Single Message Transform: 
> {code:java}
> "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
> "transforms.ExtractId.field":"id"{code}
> Kafka Connect errors with : 
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code}
> There should be a better error message here, identifying the reason for the 
> NPE.
> Version: Confluent Platform 4.1.1



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032285#comment-17032285
 ] 

Evan Williams edited comment on KAFKA-4084 at 2/7/20 10:57 AM:
---

[~sql_consulting]

We are using min.insync.replicas=1.  And have replication.factor=3 or above for 
most topics (6 brokers).
 As a side note, one interesting thing I've seen reported now from the owners 
of the clients (streams) is that, for certain topics/partitions - they had no 
leader, even if there was a clean shutdown of the bootstrapping broker. So 
something is quite weird there. What might cause that ?

 Topic was marked for deletion, without any user requesting this:

Topic: data.vehicle-topic.journey-dpi PartitionCount: 6 ReplicationFactor: 3 
Configs: cleanup.policy=delete MarkedForDeletion: true
 Topic: topic.name: 0 Leader: none Replicas: 54,52,53 Isr: 54 
MarkedForDeletion: true
 Topic: topic.name: 1 Leader: none Replicas: 82,53,54 Isr: 82 
MarkedForDeletion: true
 Topic: topic.name: 2 Leader: none Replicas: 83,54,82 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 3 Leader: none Replicas: 84,82,83 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 4 Leader: none Replicas: 52,83,84 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 5 Leader: none Replicas: 53,84,52 Isr: 53 
MarkedForDeletion: true

But yes, I think there is a clear case for KIP-491 in this scenario, of URP to 
just blacklist a broker from becoming leader until x factor is satisfied, or 
it's manually removed.

 


was (Author: blodsbror):
[~sql_consulting]

We are using min.insync.replicas=1.  And have replication.factor=3 or above for 
most topics (6 brokers).
 As a side note, one interesting thing I've seen reported now from the owners 
of the clients (streams) is that, for certain topics/partitions - they had no 
leader, even if there was a clean shutdown of the bootstrapping broker. So 
something is quite weird there. What might cause that ?

 

Topic: data.vehicle-topic.journey-dpi PartitionCount: 6 ReplicationFactor: 3 
Configs: cleanup.policy=delete MarkedForDeletion: true
 Topic: topic.name: 0 Leader: none Replicas: 54,52,53 Isr: 54 
MarkedForDeletion: true
 Topic: topic.name: 1 Leader: none Replicas: 82,53,54 Isr: 82 
MarkedForDeletion: true
 Topic: topic.name: 2 Leader: none Replicas: 83,54,82 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 3 Leader: none Replicas: 84,82,83 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 4 Leader: none Replicas: 52,83,84 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 5 Leader: none Replicas: 53,84,52 Isr: 53 
MarkedForDeletion: true

But yes, I think there is a clear case for KIP-491 in this scenario, of URP to 
just blacklist a broker from becoming leader until x factor is satisfied, or 
it's manually removed.

 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing 

[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032285#comment-17032285
 ] 

Evan Williams edited comment on KAFKA-4084 at 2/7/20 10:44 AM:
---

[~sql_consulting]

We are using min.insync.replicas=1.  And have replication.factor=3 or above for 
most topics (6 brokers).
 As a side note, one interesting thing I've seen reported now from the owners 
of the clients (streams) is that, for certain topics/partitions - they had no 
leader, even if there was a clean shutdown of the bootstrapping broker. So 
something is quite weird there. What might cause that ?

 

Topic: data.vehicle-topic.journey-dpi PartitionCount: 6 ReplicationFactor: 3 
Configs: cleanup.policy=delete MarkedForDeletion: true
 Topic: topic.name: 0 Leader: none Replicas: 54,52,53 Isr: 54 
MarkedForDeletion: true
 Topic: topic.name: 1 Leader: none Replicas: 82,53,54 Isr: 82 
MarkedForDeletion: true
 Topic: topic.name: 2 Leader: none Replicas: 83,54,82 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 3 Leader: none Replicas: 84,82,83 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 4 Leader: none Replicas: 52,83,84 Isr: 83 
MarkedForDeletion: true
 Topic: topic.name: 5 Leader: none Replicas: 53,84,52 Isr: 53 
MarkedForDeletion: true

But yes, I think there is a clear case for KIP-491 in this scenario, of URP to 
just blacklist a broker from becoming leader until x factor is satisfied, or 
it's manually removed.

 


was (Author: blodsbror):
[~sql_consulting]

We are using min.insync.replicas=1.  And have replication.factor=3 or above for 
most topics (6 brokers).
As a side note, one interesting thing I've seen reported now from the owners of 
the clients (streams) is that, for certain topics/partitions - they had no 
leader, even if there was a clean shutdown of the bootstrapping broker. So 
something is quite weird there..

But yes, I think there is a clear case for KIP-491 in this scenario, of URP to 
just blacklist a broker from becoming leader until x factor is satisfied, or 
it's manually removed.

 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032285#comment-17032285
 ] 

Evan Williams commented on KAFKA-4084:
--

[~sql_consulting]

We are using min.insync.replicas=1.  And have replication.factor=3 or above for 
most topics (6 brokers).
As a side note, one interesting thing I've seen reported now from the owners of 
the clients (streams) is that, for certain topics/partitions - they had no 
leader, even if there was a clean shutdown of the bootstrapping broker. So 
something is quite weird there..

But yes, I think there is a clear case for KIP-491 in this scenario, of URP to 
just blacklist a broker from becoming leader until x factor is satisfied, or 
it's manually removed.

 

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032267#comment-17032267
 ] 

GEORGE LI edited comment on KAFKA-4084 at 2/7/20 10:02 AM:
---

[~blodsbror] 
[~junrao]
[~sriharsha] 

With `auto.leader.rebalance.enable=true`,  even with checking the broker is in 
ISR, then rebalance the leadership to this broker.  It might still impact the 
partition that this broker is serving leadership because the broker is still 
trying to catch up.  

[~blodsbror]  
Is your cluster  having lossless setting like min.insync.replicas > 1 ?   For 
lossless,  we experience high producer latency during  catchup or  
reassignments. The leader deprioritized list feature is convenient in this 
case. it will just put this broker catching up in the lowest priority when 
considering being the leader.   Another useful case is when the current 
controlller is very busy with metadata request,  "blacklist" it, and only 
serving as followers can give 10-15% CPU back to the controller (without 
bouncing it). 

In a cluster without any down brokers,  no URP (Under Replicated Partitions),   
there is a workaround to  run reassignments to move that broker to the end of 
the partition assignment,  e.g.  broker_id 100 is down.then partition 
assignment (100, 101, 102) => (101, 102, 100).  the reassignment should 
complete fast because all replicas in ISR.  then run preferred leader election 
will change the leader from 100 => 101.   The downside is:  its more work to 
rollback or rebalance again.KIP-491 is very easy to rollback, just unset 
the dynamic config.

In the case of the cluster with URP,  the reassignment approach to move the 
broker to the end of the assignment might not work, because the broker is not 
in ISR. the reassignments will be pending till it catches up and in ISR. 

So maybe we do have a good use-case for KIP-491 ?





was (Author: sql_consulting):
[~blodsbror] [~junrao][~sriharsha] 

With `auto.leader.rebalance.enable=true`,  even with checking the broker is in 
ISR, then rebalance the leadership to this broker.  It might still impact the 
partition that this broker is serving leadership because the broker is still 
trying to catch up.  

[~blodsbror]  is your cluster  having lossless setting like min.insync.replicas 
> 1 ?   For lossless,  we experience high producer latency during  catchup or  
reassignments. The leader deprioritized list feature is convenient in this 
case. it will just put this broker catching up in the lowest priority when 
considering being the leader.   Another useful case is when the current 
controlller is very busy with metadata request,  "blacklist" it, and only 
serving as followers can give 10-15% CPU back to the controller (without 
bouncing it). 

In a cluster without any down brokers,  no URP (Under Replicated Partitions),   
there is a workaround to  run reassignments to move that broker to the end of 
the partition assignment,  e.g.  broker_id 100 is down.then partition 
assignment (100, 101, 102) => (101, 102, 100).  the reassignment should 
complete fast because all replicas in ISR.  then run preferred leader election 
will change the leader from 100 => 101.   The downside is:  its more work to 
rollback or rebalance again.KIP-491 is very easy to rollback, just unset 
the dynamic config.

In the case of the cluster with URP,  the reassignment approach to move the 
broker to the end of the assignment might not work, because the broker is not 
in ISR. the reassignments will be pending till it catches up and in ISR. 

So maybe we do have a good use-case for KIP-491 ?




> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. 

[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032267#comment-17032267
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror] [~junrao][~sriharsha] 

With `auto.leader.rebalance.enable=true`,  even with checking the broker is in 
ISR, then rebalance the leadership to this broker.  It might still impact the 
partition that this broker is serving leadership because the broker is still 
trying to catch up.  

[~blodsbror]  is your cluster  having lossless setting like min.insync.replicas 
> 1 ?   For lossless,  we experience high producer latency during  catchup or  
reassignments. The leader deprioritized list feature is convenient in this 
case. it will just put this broker catching up in the lowest priority when 
considering being the leader.   Another useful case is when the current 
controlller is very busy with metadata request,  "blacklist" it, and only 
serving as followers can give 10-15% CPU back to the controller (without 
bouncing it). 

In a cluster without any down brokers,  no URP (Under Replicated Partitions),   
there is a workaround to  run reassignments to move that broker to the end of 
the partition assignment,  e.g.  broker_id 100 is down.then partition 
assignment (100, 101, 102) => (101, 102, 100).  the reassignment should 
complete fast because all replicas in ISR.  then run preferred leader election 
will change the leader from 100 => 101.   The downside is:  its more work to 
rollback or rebalance again.KIP-491 is very easy to rollback, just unset 
the dynamic config.

In the case of the cluster with URP,  the reassignment approach to move the 
broker to the end of the assignment might not work, because the broker is not 
in ISR. the reassignments will be pending till it catches up and in ISR. 

So maybe we do have a good use-case for KIP-491 ?




> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9522) kafka-connect failed when the topic can not be accessed

2020-02-07 Thread Deblock (Jira)
Deblock created KAFKA-9522:
--

 Summary: kafka-connect failed when the topic can not be accessed
 Key: KAFKA-9522
 URL: https://issues.apache.org/jira/browse/KAFKA-9522
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.3.1
Reporter: Deblock


The kafka-connect fail if the topic can not be join (permission issue or topic 
doesn't exists).

 

This issue happend using Debezium CDC : 
[https://issues.redhat.com/browse/DBZ-1770]

 

The topic can be choosen using a column on the database. If the topic on a 
database contains an issue (permission issue or topic doesn't exists), the 
connect stop to work, and new event will not be sent.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-02-07 Thread Evan Williams (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032200#comment-17032200
 ] 

Evan Williams commented on KAFKA-4084:
--

[~junrao]

Yes, I can see that it's been throttled to exactly 10MB/sec. So is working. I'm 
not sure how the fetcherthreads work, or if num.fetcher.threads=1 can still 
spawn multiple child threads that can take over the CPU (and how that interacts 
with the number of set IO/network threads). But yes, 10MB/s on a 4vcpu server 
shouldn't cause this. 

Anyway, didn't mean to turn this into a support thread :) But it just show's 
that I could very much use a 'blacklist' feature, to stop a broker from 
becoming leader while it replicates. Then it can hit higher CPU limits, without 
effecting any clients. It seems that it's not easy to control load, even when 
trying to apply throttles. Am open to any other ideas to try, but yes - thanks 
for all your help.

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)