[jira] [Created] (KAFKA-10396) Overall memory of container keep on growing due to kafka stream / rocksdb and OOM killed once limit reached

2020-08-12 Thread Vagesh Mathapati (Jira)
Vagesh Mathapati created KAFKA-10396:


 Summary: Overall memory of container keep on growing due to kafka 
stream / rocksdb and OOM killed once limit reached
 Key: KAFKA-10396
 URL: https://issues.apache.org/jira/browse/KAFKA-10396
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect, streams
Affects Versions: 2.5.0, 2.3.1
Reporter: Vagesh Mathapati


We are observing that overall memory of our container keep on growing and never 
came down.
After analysis find out that rocksdbjni.so is keep on allocating 64M chunks of 
memory off-heap and never releases back. This causes OOM kill after memory 
reaches configured limit.

We use Kafka stream and globalktable for our many kafka topics.

Below is our environment
 * Kubernetes cluster
 * openjdk 11.0.7 2020-04-14 LTS
 * OpenJDK Runtime Environment Zulu11.39+16-SA (build 11.0.7+10-LTS)
 * OpenJDK 64-Bit Server VM Zulu11.39+16-SA (build 11.0.7+10-LTS, mixed mode)
 * Springboot 2.3
 * spring-kafka-2.5.0
 * kafka-streams-2.5.0
 * kafka-streams-avro-serde-5.4.0
 * rocksdbjni-5.18.3

Observed same result with kafka 2.3 version.

Below is the snippet of our analysis
from pmap output we took addresses from these 64M allocations (RSS)

Address Kbytes RSS Dirty Mode Mapping
7f3ce800 65536 65532 65532 rw--- [ anon ]
7f3cf400 65536 65536 65536 rw--- [ anon ]
7f3d6400 65536 65536 65536 rw--- [ anon ]

We tried to match with memory allocation logs enabled with the help of Azul 
systems team.

@ /tmp/librocksdbjni6564497922441568920.so:
_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
 - 0x7f3ce8ff7ca0
 @ /tmp/librocksdbjni6564497922441568920.so:
_ZN7rocksdb15BlockBasedTable3GetERKNS_11ReadOptionsERKNS_5SliceEPNS_10GetContextEPKNS_14SliceTransformEb+0x894)[0x7f3e1c898fd4]
 - 0x7f3ce8ff9780
 @ /tmp/librocksdbjni6564497922441568920.so:
_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da]
 - 0x7f3ce8ff9750
 @ /tmp/librocksdbjni6564497922441568920.so:
_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
 - 0x7f3ce8ff97c0
 @ 
/tmp/librocksdbjni6564497922441568920.so:_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da]
 - 0x7f3ce8ffccf0
 @ /tmp/librocksdbjni6564497922441568920.so:
_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
 - 0x7f3ce8ffcd10
 @ /tmp/librocksdbjni6564497922441568920.so:
_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0xfa)[0x7f3e1c65d5da]
 - 0x7f3ce8ffccf0
 @ /tmp/librocksdbjni6564497922441568920.so:
_Z18rocksdb_get_helperP7JNIEnv_PN7rocksdb2DBERKNS1_11ReadOptionsEPNS1_18ColumnFamilyHandleEP11_jbyteArrayii+0x261)[0x7f3e1c65d741]
 - 0x7f3ce8ffcd10


We also identified that content on this 64M is just 0s and no any data present 
in it.

I tried to tune the rocksDB configuratino as mentioned but it did not helped. 
[https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]

 

Please let me know if you need any more details



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2020-08-12 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176796#comment-17176796
 ] 

feyman commented on KAFKA-10307:


Hi, [~vvcephei]

I'm running with the trunk branch but maybe not the latest trunk, I will verify 
with the latest and get back to you, thanks!

> Topology cycles in 
> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> -
>
> Key: KAFKA-10307
> URL: https://issues.apache.org/jira/browse/KAFKA-10307
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Priority: Major
> Attachments: repartition_calc.jpg
>
>
> We have spotted a cycled topology for the foreign-key join test 
> *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug 
> in the algorithm or the test only. Used 
> [https://zz85.github.io/kafka-streams-viz/] to visualize:
> {code:java}
> Sub-topology: 0
> Source: KTABLE-SOURCE-19 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Source: KTABLE-SOURCE-32 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Source: KSTREAM-SOURCE-01 (topics: [table1])
>   --> KTABLE-SOURCE-02
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-OUTPUT-21
>   <-- KTABLE-SOURCE-19
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: 
> [INNER-store1])
>   --> KTABLE-FK-JOIN-OUTPUT-34
>   <-- KTABLE-SOURCE-32
> Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2])
>   --> KTABLE-TOSTREAM-35
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Processor: KTABLE-SOURCE-02 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
>   <-- KSTREAM-SOURCE-01
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: 
> [])
>   --> KTABLE-SINK-11
>   <-- KTABLE-SOURCE-02
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: 
> [])
>   --> KTABLE-SINK-24
>   <-- KTABLE-FK-JOIN-OUTPUT-21
> Processor: KTABLE-TOSTREAM-35 (stores: [])
>   --> KSTREAM-SINK-36
>   <-- KTABLE-FK-JOIN-OUTPUT-34
> Sink: KSTREAM-SINK-36 (topic: output-)
>   <-- KTABLE-TOSTREAM-35
> Sink: KTABLE-SINK-11 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
> Sink: KTABLE-SINK-24 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23  Sub-topology: 1
> Source: KSTREAM-SOURCE-04 (topics: [table2])
>   --> KTABLE-SOURCE-05
> Source: KTABLE-SOURCE-12 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15
>   <-- KTABLE-SOURCE-12
> Processor: KTABLE-SOURCE-05 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16
>   <-- KSTREAM-SOURCE-04
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-SINK-18
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-SINK-18
>   <-- KTABLE-SOURCE-05
> Sink: KTABLE-SINK-18 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, 
> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16  Sub-topology: 2
> Source: KSTREAM-SOURCE-0

[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2020-08-12 Thread GitBox


JoelWee commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r469725796



##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -646,22 +655,68 @@ private boolean isIntermediateTopic(final String topic) {
 return options.valuesOf(intermediateTopicsOption).contains(topic);
 }
 
-private void maybeDeleteInternalTopics(final Admin adminClient, final 
boolean dryRun) {
-System.out.println("Deleting all internal/auto-created topics for 
application " + options.valueOf(applicationIdOption));
+private int maybeDeleteInternalTopics(final Admin adminClient, final 
boolean dryRun) {
+if (!options.valuesOf(internalTopicsOption).isEmpty()) {
+return maybeDeleteSpecifiedInternalTopics(adminClient, dryRun);
+} else {
+return maybeDeleteInferredInternalTopics(adminClient, dryRun);
+}
+}
+
+private int maybeDeleteSpecifiedInternalTopics(final Admin adminClient, 
final boolean dryRun) {
+final List internalTopics = 
options.valuesOf(internalTopicsOption);
+int topicNotFound = EXIT_CODE_SUCCESS;
+
+final List topicsToDelete = new ArrayList<>();
+final List notFoundInternalTopics = new ArrayList<>();
+
+System.out.println("Deleting specified internal/auto-created topics " 
+ internalTopics);
+for (final String topic : internalTopics) {
+if (allTopics.contains(topic) && isInferredInternalTopic(topic)) {
+topicsToDelete.add(topic);
+} else {
+notFoundInternalTopics.add(topic);
+}
+}
+
+if (!notFoundInternalTopics.isEmpty()) {

Review comment:
   I would prefer to throw an exception, since it's likely that the user 
has made a mistake. I've implemented this now





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JoelWee commented on a change in pull request #8923: KAFKA-6435: KIP-623 Add internal topics option to streamResetter

2020-08-12 Thread GitBox


JoelWee commented on a change in pull request #8923:
URL: https://github.com/apache/kafka/pull/8923#discussion_r469722613



##
File path: docs/streams/developer-guide/app-reset-tool.html
##
@@ -77,6 +77,7 @@
 
 Step 1: Run the application reset tool
 Invoke the application reset tool from the command line
+Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that you run this once with --dry-run 
to preview your changes before making them.

Review comment:
   👍  here's a screenshot for this portion:
   
   https://user-images.githubusercontent.com/32009741/90100918-40b68300-dd70-11ea-8ae2-a08c385156da.png";>
   

##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -167,7 +171,7 @@ public int run(final String[] args,
 final HashMap consumerConfig = new 
HashMap<>(config);
 consumerConfig.putAll(properties);
 exitCode = 
maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
-maybeDeleteInternalTopics(adminClient, dryRun);
+exitCode |= maybeDeleteInternalTopics(adminClient, dryRun);

Review comment:
   Yep. Currently it returns 1 if either exitCode is 1, and 0 otherwise. Or 
should we do something else?

##
File path: docs/streams/developer-guide/app-reset-tool.html
##
@@ -106,6 +107,11 @@ Step 1: Run the application reset tool(topics 
used in the through()
 method). For 
these topics, the tool
 will skip to the end.
+--internal-topics   Comma-separated list of internal 
topics
+to delete. Must be a subset of the
+internal topics marked for deletion by
+the default behaviour (do a dry-run 
without
+this option to view these topics).

Review comment:
   screenshot of this portion:
   
   https://user-images.githubusercontent.com/32009741/90100950-52982600-dd70-11ea-9ae5-2b84296188cc.png";>
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

2020-08-12 Thread GitBox


abbccdda commented on a change in pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#discussion_r469713175



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##
@@ -805,10 +805,10 @@ public void advanceWallClockTime(final Duration advance) {
 
 private Queue> getRecordsQueue(final String 
topicName) {
 final Queue> outputRecords = 
outputRecordsByTopic.get(topicName);
-if (outputRecords == null) {
-if (!processorTopology.sinkTopics().contains(topicName)) {
-throw new IllegalArgumentException("Unknown topic: " + 
topicName);
-}
+if (outputRecords == null && 
!processorTopology.sinkTopics().contains(topicName)) {

Review comment:
   Seems good enough as a bug fix, but I was wondering whether we could 
detect the dynamic topic is configured or not to make sure we are not actually 
allowing some other bugs to catch in TTD

##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##
@@ -805,10 +805,10 @@ public void advanceWallClockTime(final Duration advance) {
 
 private Queue> getRecordsQueue(final String 
topicName) {
 final Queue> outputRecords = 
outputRecordsByTopic.get(topicName);
-if (outputRecords == null) {
-if (!processorTopology.sinkTopics().contains(topicName)) {
-throw new IllegalArgumentException("Unknown topic: " + 
topicName);
-}
+if (outputRecords == null && 
!processorTopology.sinkTopics().contains(topicName)) {
+log.warn("Unrecognized topic: {}, this can occur if dynamic 
routing is used and no output has been "
+ + "sent to this topic yet. If not using a 
TopicNameExtractor, check that the output topic "
+ + "is correct. ", topicName);

Review comment:
   nit: could avoid the last space after `is correct.`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] albert02lowis commented on a change in pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…

2020-08-12 Thread GitBox


albert02lowis commented on a change in pull request #9108:
URL: https://github.com/apache/kafka/pull/9108#discussion_r469707761



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
##
@@ -123,25 +117,18 @@
 public static void setupConfigsAndUtils() {
 
 STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());

Review comment:
   Hi @bbejeck I have done the required change in a new commit. I have also 
extracted out redundant calls to set `BOOTSTRAP_SERVERS_CONFIG` in the 
subclasses' test methods.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] albert02lowis commented on a change in pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…

2020-08-12 Thread GitBox


albert02lowis commented on a change in pull request #9108:
URL: https://github.com/apache/kafka/pull/9108#discussion_r469704319



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
##
@@ -123,25 +117,18 @@
 public static void setupConfigsAndUtils() {
 
 STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());

Review comment:
   I see, let me add back this BOOTSTRAP_SERVERS_CONFIG inside 
setupConfigsAndUtils then 👍🏻 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] albert02lowis commented on a change in pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…

2020-08-12 Thread GitBox


albert02lowis commented on a change in pull request #9108:
URL: https://github.com/apache/kafka/pull/9108#discussion_r469704319



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
##
@@ -123,25 +117,18 @@
 public static void setupConfigsAndUtils() {
 
 STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());

Review comment:
   I see, let me add back this BOOTSTRAP_SERVERS_CONFIG then 👍🏻 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RamanVerma commented on a change in pull request #9112: KAFKA-10312 Fix error code returned by getPartitionMetadata

2020-08-12 Thread GitBox


RamanVerma commented on a change in pull request #9112:
URL: https://github.com/apache/kafka/pull/9112#discussion_r469698016



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -187,49 +191,24 @@ class MetadataCacheTest {
   new UpdateMetadataBroker()
 .setId(1)
 .setEndpoints(broker1Endpoints.asJava))
-verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(brokers, 
sslListenerName,
+val metadataCacheBrokerId = 9

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10391) Streams should overwrite checkpoint excluding corrupted partitions

2020-08-12 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-10391.
---
Fix Version/s: 2.7.0
   Resolution: Fixed

> Streams should overwrite checkpoint excluding corrupted partitions
> --
>
> Key: KAFKA-10391
> URL: https://issues.apache.org/jira/browse/KAFKA-10391
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.7.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-9450 I 
> discovered another bug in Streams: when some partitions are corrupted due to 
> offsets out of range, we treat it as task corrupted and would close them as 
> dirty and then revive. However we forget to overwrite the checkpoint file 
> excluding those out-of-range partitions to let them be re-bootstrapped from 
> the new log-start offset, and hence when the task is revived, it would still 
> load the old offset and start from there and then get the out-of-range 
> exception again. This may cause {{StreamsUpgradeTest.test_app_upgrade}} to be 
> flaky.
> We do not see this often because in the past we always delete the checkpoint 
> file after loading it and we usually only see the out-of-range exception at 
> the beginning of the restoration but not during restoration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang merged pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

2020-08-12 Thread GitBox


guozhangwang merged pull request #9170:
URL: https://github.com/apache/kafka/pull/9170


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469680702



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeUserScramCredentialsResult.java
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * The result of the {@link Admin#describeUserScramCredentials()} call.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeUserScramCredentialsResult {
+private final KafkaFuture> 
future;

Review comment:
   Now that I'm working on this, I discovered that there is no other API 
that can describe or list everything that works this way.  Everything that can 
describe or list everything returns a single future.  Every describe or list 
API that returns a map of keys to futures requires a non-empty list of keys to 
describe or list.  For example:
   
   1. `listTopics()` lists all topics and returns a single `Future`; the 
`describeTopics()` API returns a map of names to futures but requires a 
non-empty list of topics to describe.
   2. `describeConfigs()` returns a map of resources to futures but requires a 
non-empty list of resources to describe.
   3. `describeLogDirs()` returns a map of broker IDs to futures but requires a 
non-empty list of brokers to describe.
   4. `describeReplicaLogDirs()` returns a map of replicas to futures but 
requires a non-empty list of replicas to describe.
   5. `describeConsumerGroups()` returns a map of consumer groups to futures 
but requires a non-empty list of consumer groups to describe.
   6. `listPartitionReassignments()` allows listing all or a subset of 
reassignments and returns a single future.
   7. `listOffsets()` returns a map of topic-partitions to futures but requires 
a non-empty list of topic-partitions to describe.
   8. `describeClientQuotas()` allows listing all or a subset of quotas and 
returns a single future.
   
   I think if we made this change here we would be off the beaten path.  That's 
not necessarily bad, but what tipped me off to this was the fact that when we 
list everything we have to create a future for every user that gets returned, 
and we don't know that list of users when we make the request, so there's 
really no way to implement it.
   
   We could create two separate APIs: one for describing some explicit, 
non-empty list of users, which would return a map of users to futures, and 
another one that describes everything, which returns a single future.  
`listTopics()` vs `describeTopics()` works this way, for example, though the 
information returned in the two is very different: when listing you just get 
the names, and when describing you get a lot more.  I don't see us 
distinguishing between listing vs. describing in terms of data -- we are going 
to send back the same two things (mechanism and iterations) regardless.  So we 
would probably be talking about creating a `describeUserScramCredentials()` API 
and a `describeAllUserScramCredentials()` API with the first taking a list and 
returning a map of futures and the second not taking a list and returning a 
single future.
   
   But I'm thinking we should just keep it the way it is -- take a 
possibly-empty list and return a single future regardles of whether the list 
was empty or not.
   
   Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

2020-08-12 Thread GitBox


chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-673212958


   ```org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > 
shouldUpgradeFromEosAlphaToEosBeta[true]``` failed. I test it on my local and 
it pass. retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469650293



##
File path: 
core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
##
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.server.BaseRequestTest
+import kafka.utils.Exit
+import org.junit.Assert._
+import org.junit.Test
+
+class UserScramCredentialsCommandTest extends BaseRequestTest {
+  override def brokerCount = 1
+  var exitStatus: Option[Int] = None
+  var exitMessage: Option[String] = None
+
+  case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = 
None)
+
+  private def runConfigCommandViaBroker(args: Array[String]) : 
ConfigCommandResult = {
+val byteArrayOutputStream = new ByteArrayOutputStream()
+val utf8 = StandardCharsets.UTF_8.name
+val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
+var exitStatus: Option[Int] = None
+Exit.setExitProcedure { (status, _) =>

Review comment:
   Actually, I think this may not be an issue since parallel tests in 
Gradle run in separate processes rather than separate threads.  From 
https://docs.gradle.org/current/dsl/org.gradle.api.tasks.testing.Test.html: 
`"Test are always run in (one or more) separate JVMs."`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469646777



##
File path: 
core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
##
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.server.BaseRequestTest
+import kafka.utils.Exit
+import org.junit.Assert._
+import org.junit.Test
+
+class UserScramCredentialsCommandTest extends BaseRequestTest {
+  override def brokerCount = 1
+  var exitStatus: Option[Int] = None
+  var exitMessage: Option[String] = None
+
+  case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = 
None)
+
+  private def runConfigCommandViaBroker(args: Array[String]) : 
ConfigCommandResult = {
+val byteArrayOutputStream = new ByteArrayOutputStream()
+val utf8 = StandardCharsets.UTF_8.name
+val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
+var exitStatus: Option[Int] = None
+Exit.setExitProcedure { (status, _) =>

Review comment:
   Hmm, good point, I think there may be a problem here in general because 
there is only a single exit procedure that can be set globally, and multiple 
tests that set/reset it in parallel will collide.  There are 16 Scala test 
classes in `core` out of 260 that do this -- so 6% of test classes.  So I think 
this will introduce some flakiness to these 16 tests.  Does this sound correct 
to you, and should we open a separate ticket for this as opposed to trying to 
fix it here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469646777



##
File path: 
core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
##
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.server.BaseRequestTest
+import kafka.utils.Exit
+import org.junit.Assert._
+import org.junit.Test
+
+class UserScramCredentialsCommandTest extends BaseRequestTest {
+  override def brokerCount = 1
+  var exitStatus: Option[Int] = None
+  var exitMessage: Option[String] = None
+
+  case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = 
None)
+
+  private def runConfigCommandViaBroker(args: Array[String]) : 
ConfigCommandResult = {
+val byteArrayOutputStream = new ByteArrayOutputStream()
+val utf8 = StandardCharsets.UTF_8.name
+val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
+var exitStatus: Option[Int] = None
+Exit.setExitProcedure { (status, _) =>

Review comment:
   Hmm, good point, I think there may be a problem here in general because 
there is only a single exit procedure that can be set globally, and multiple 
tests that set/reset it in parallel will collide.  There are 16 Scala test 
classes in `core` out of 260 that do this -- so 4% of test classes.  So I think 
this will introduce some flakiness to these 16 tests.  Does this sound correct 
to you, and should we open a separate ticket for this as opposed to trying to 
fix it here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

2020-08-12 Thread GitBox


ableegoldman commented on pull request #9174:
URL: https://github.com/apache/kafka/pull/9174#issuecomment-673189749


   @cadonna @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9163: KAFKA-10386; Fix flexible version support for `records` type

2020-08-12 Thread GitBox


hachikuji commented on pull request #9163:
URL: https://github.com/apache/kafka/pull/9163#issuecomment-673189746


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #9174: KAFKA-10395: relax output topic check in TTD to work with dynamic routing

2020-08-12 Thread GitBox


ableegoldman opened a new pull request #9174:
URL: https://github.com/apache/kafka/pull/9174


   I went through all 5 stages of grief in thinking about what to do here and 
decided the best thing was to just relax the check after all. Hopefully users 
who find their output topic unexpectedly empty due to a typo in the topic name 
will be able to figure it out quickly from the warning we now log instead.
   
   Not sure how rampant the problem of output-topic-typos is to begin with...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469637266



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialUpsertion.java
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.Objects;
+
+/**
+ * A request to update/insert a SASL/SCRAM credential for a user.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554:
 Add Broker-side SCRAM Config API
+ */
+public class UserScramCredentialUpsertion extends 
UserScramCredentialAlteration {
+private final ScramCredentialInfo info;
+private final byte[] salt;
+private final byte[] password;
+
+/**
+ * Constructor that generates a random salt
+ *
+ * @param user the user for which the credential is to be updated/inserted
+ * @param credentialInfo the mechanism and iterations to be used
+ * @param password the password
+ */
+public UserScramCredentialUpsertion(String user, ScramCredentialInfo 
credentialInfo, String password) {
+this(user, credentialInfo, password.getBytes(StandardCharsets.UTF_8));
+}
+
+/**
+ * Constructor that generates a random salt
+ *
+ * @param user the user for which the credential is to be updated/inserted
+ * @param credentialInfo the mechanism and iterations to be used
+ * @param password the password
+ */
+public UserScramCredentialUpsertion(String user, ScramCredentialInfo 
credentialInfo, byte[] password) {
+this(user, credentialInfo, password, generateRandomSalt());
+}
+
+/**
+ * Constructor that accepts an explicit salt
+ *
+ * @param user the user for which the credential is to be updated/inserted
+ * @param credentialInfo the mechanism and iterations to be used
+ * @param password the password
+ * @param salt the salt to be used
+ */
+public UserScramCredentialUpsertion(String user, ScramCredentialInfo 
credentialInfo, byte[] password, byte[] salt) {
+super(Objects.requireNonNull(user));
+this.info = Objects.requireNonNull(credentialInfo);
+this.password = Objects.requireNonNull(password);
+this.salt = Objects.requireNonNull(salt);
+}
+
+/**
+ *
+ * @return the mechanism and iterations
+ */
+public ScramCredentialInfo credentialInfo() {
+return info;
+}
+
+/**
+ *
+ * @return the salt
+ */
+public byte[] salt() {
+return salt;
+}
+
+/**
+ *
+ * @return the password
+ */
+public byte[] password() {
+return password;
+}
+
+private static byte[] generateRandomSalt() {
+return new BigInteger(130, new 
SecureRandom()).toString(Character.MAX_RADIX).getBytes(StandardCharsets.UTF_8);

Review comment:
   @cmccabe  I think the approach you suggest leaves out how to identify 
`length` which itself needs to be randomized.  I got the current implementation 
from `org.apache.kafka.common.security.scram.internals.ScramFormatter`.  I 
would have invoked `ScramFormatter.secureRandomBytes()` directly, but it is not 
`static` and I did not want to either instantiate an instance or change methods 
to static (though the class is internal, so I could have done that). I instead 
replicated the logic here.  The array length ends up being random with this 
approach, as do the bytes in the array.  Let me know what you think.  Currently 
I've left this as-is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-12 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-10395:
---

Assignee: Sophie Blee-Goldman

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: test-framework
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-12 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176675#comment-17176675
 ] 

Sophie Blee-Goldman commented on KAFKA-10395:
-

The only really holistic solution I can think of would be to add some API to 
register the possible output topics on the TopicNameExtractor itself. But that 
seems like overkill here and honestly just replacing the exception with a 
warning log sounds good enough

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: test-framework
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10042) Make INVALID_PRODUCER_EPOCH abortable from Produce response

2020-08-12 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10042.
-
Resolution: Fixed

> Make INVALID_PRODUCER_EPOCH abortable from Produce response
> ---
>
> Key: KAFKA-10042
> URL: https://issues.apache.org/jira/browse/KAFKA-10042
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer 
>Reporter: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10042) Make INVALID_PRODUCER_EPOCH abortable from Produce response

2020-08-12 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-10042:
---

Assignee: Boyang Chen

> Make INVALID_PRODUCER_EPOCH abortable from Produce response
> ---
>
> Key: KAFKA-10042
> URL: https://issues.apache.org/jira/browse/KAFKA-10042
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer 
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10042) Make INVALID_PRODUCER_EPOCH abortable from Produce response

2020-08-12 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10042:

Fix Version/s: 2.7.0

> Make INVALID_PRODUCER_EPOCH abortable from Produce response
> ---
>
> Key: KAFKA-10042
> URL: https://issues.apache.org/jira/browse/KAFKA-10042
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer 
>Reporter: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-12 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176665#comment-17176665
 ] 

Sophie Blee-Goldman edited comment on KAFKA-10395 at 8/13/20, 12:41 AM:


I think we could just modify the TestOutputTopic constructor to automatically 
register the output topic with the TTD, which wouldn't need a KIP. This 
wouldn't help users who just read from the TTD directly instead of using the 
new TestOutputTopic class, but maybe that's sufficient. We can just tell people 
to always use TestOutputTopic if their topology has dynamic routing?

 

edit: Actually it looks like the only public APIs that run into this issue are 
from TestOutputTopic, so we can avoid a KIP. That said, registering the topics 
for which a TestOutputTopic instance is created is not all that different from 
relaxing/removing the check altogether. So maybe we should just do that – we 
can at least log a warning still


was (Author: ableegoldman):
I think we could just modify the TestOutputTopic constructor to automatically 
register the output topic with the TTD, which wouldn't need a KIP. This 
wouldn't help users who just read from the TTD directly instead of using the 
new TestOutputTopic class, but maybe that's sufficient. We can just tell people 
to always use TestOutputTopic if their topology has dynamic routing?

 

edit: Actually it looks like the only public APIs that run into this issue are 
from TestOutputTopic, so we can avoid a KIP

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: test-framework
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469627624



##
File path: 
core/src/test/scala/unit/kafka/server/DescribeUserScramCredentialsRequestTest.scala
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.util
+import java.util.Properties
+
+import kafka.network.SocketServer
+import kafka.security.authorizer.AclAuthorizer
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData
+import 
org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{DescribeUserScramCredentialsRequest, 
DescribeUserScramCredentialsResponse}
+import org.apache.kafka.common.resource.ResourceType
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult}
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Test DescribeUserScramCredentialsRequest/Response API for the cases where 
no credentials exist
+ * or failure is expected due to lack of authorization, sending the request to 
a non-controller broker, or some other issue.
+ * Testing the API for the case where there are actually credentials to 
describe is performed elsewhere.
+ */
+class DescribeUserScramCredentialsRequestTest extends BaseRequestTest {
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+properties.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[DescribeCredentialsTest.TestAuthorizer].getName)
+properties.put(KafkaConfig.PrincipalBuilderClassProp, 
classOf[DescribeCredentialsTest.TestPrincipalBuilder].getName)
+  }
+
+  @Before
+  override def setUp(): Unit = {
+DescribeCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is 
to be authorized
+super.setUp()
+  }
+
+  @Test
+  def testDescribeNothing(): Unit = {
+val request = new DescribeUserScramCredentialsRequest.Builder(
+  new DescribeUserScramCredentialsRequestData()).build()
+val response = sendDescribeUserScramCredentialsRequest(request)
+
+val error = response.data.error
+assertEquals("Expected no error when routed correctly", Errors.NONE.code, 
error)
+assertEquals("Expected no credentials", 0, 
response.data.userScramCredentials.size)
+  }
+
+  @Test
+  def testDescribeNotController(): Unit = {
+val request = new DescribeUserScramCredentialsRequest.Builder(
+  new DescribeUserScramCredentialsRequestData()).build()
+val response = sendDescribeUserScramCredentialsRequest(request, 
notControllerSocketServer)
+
+val error = response.data.error
+assertEquals("Expected controller error when routed incorrectly", 
Errors.NOT_CONTROLLER.code, error)
+  }
+
+  @Test
+  def testDescribeNotAuthorized(): Unit = {
+DescribeCredentialsTest.principal = 
DescribeCredentialsTest.UnauthorizedPrincipal
+
+val request = new DescribeUserScramCredentialsRequest.Builder(
+  new DescribeUserScramCredentialsRequestData()).build()
+val response = sendDescribeUserScramCredentialsRequest(request)
+
+val error = response.data.error
+assertEquals("Expected not authorized error", 
Errors.CLUSTER_AUTHORIZATION_FAILED.code, error)
+  }
+
+  @Test
+  def testDescribeSameUserTwice(): Unit = {
+val user = new UserName().setName("user1")
+val request = new DescribeUserScramCredentialsRequest.Builder(
+  new DescribeUserScramCredentialsRequestData().setUsers(List(user, 
user).asJava)).build()
+val response = sendDescribeUserScramCredentialsRequest(request)
+
+val error = response.data.error
+assertEquals("Expected invalid request error", 
Errors.INVALID_REQUEST.code, error)
+  }
+
+
+  private def sendDescribeUserScramCredentialsRequest(request: 
DescribeUserScramCredentialsRequest, socketServer: SocketServer = 
controllerSocketServer): Desc

[jira] [Updated] (KAFKA-9803) Allow producers to recover gracefully from transaction timeouts

2020-08-12 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9803:
---
Fix Version/s: 2.7.0

> Allow producers to recover gracefully from transaction timeouts
> ---
>
> Key: KAFKA-9803
> URL: https://issues.apache.org/jira/browse/KAFKA-9803
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer , streams
>Reporter: Jason Gustafson
>Assignee: Boyang Chen
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.7.0
>
>
> Transaction timeouts are detected by the transaction coordinator. When the 
> coordinator detects a timeout, it bumps the producer epoch and aborts the 
> transaction. The epoch bump is necessary in order to prevent the current 
> producer from being able to begin writing to a new transaction which was not 
> started through the coordinator.  
> Transactions may also be aborted if a new producer with the same 
> `transactional.id` starts up. Similarly this results in an epoch bump. 
> Currently the coordinator does not distinguish these two cases. Both will end 
> up as a `ProducerFencedException`, which means the producer needs to shut 
> itself down. 
> We can improve this with the new APIs from KIP-360. When the coordinator 
> times out a transaction, it can remember that fact and allow the existing 
> producer to claim the bumped epoch and continue. Roughly the logic would work 
> like this:
> 1. When a transaction times out, set lastProducerEpoch to the current epoch 
> and do the normal bump.
> 2. Any transactional requests from the old epoch result in a new 
> TRANSACTION_TIMED_OUT error code, which is propagated to the application.
> 3. The producer recovers by sending InitProducerId with the current epoch. 
> The coordinator returns the bumped epoch.
> One issue that needs to be addressed is how to handle INVALID_PRODUCER_EPOCH 
> from Produce requests. Partition leaders will not generally know if a bumped 
> epoch was the result of a timed out transaction or a fenced producer. 
> Possibly the producer can treat these errors as abortable when they come from 
> Produce responses. In that case, the user would try to abort the transaction 
> and then we can see if it was due to a timeout or otherwise.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-12 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176665#comment-17176665
 ] 

Sophie Blee-Goldman edited comment on KAFKA-10395 at 8/13/20, 12:36 AM:


I think we could just modify the TestOutputTopic constructor to automatically 
register the output topic with the TTD, which wouldn't need a KIP. This 
wouldn't help users who just read from the TTD directly instead of using the 
new TestOutputTopic class, but maybe that's sufficient. We can just tell people 
to always use TestOutputTopic if their topology has dynamic routing?

 

edit: Actually it looks like the only public APIs that run into this issue are 
from TestOutputTopic, so we can avoid a KIP


was (Author: ableegoldman):
I think we could just modify the TestOutputTopic constructor to automatically 
register the output topic with the TTD, which wouldn't need a KIP. This 
wouldn't help users who just read from the TTD directly instead of using the 
new TestOutputTopic class, but maybe that's sufficient. We can just tell people 
to always use TestOutputTopic if their topology has dynamic routing?

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: test-framework
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-12 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10395:

Labels: test-framework  (was: needs-kip test-framework)

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: test-framework
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #9104: KAFKA-10266: Update the connector config header.converter

2020-08-12 Thread GitBox


showuon commented on pull request #9104:
URL: https://github.com/apache/kafka/pull/9104#issuecomment-673173805


   @kkonstantine , could you review this PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-12 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176665#comment-17176665
 ] 

Sophie Blee-Goldman commented on KAFKA-10395:
-

I think we could just modify the TestOutputTopic constructor to automatically 
register the output topic with the TTD, which wouldn't need a KIP. This 
wouldn't help users who just read from the TTD directly instead of using the 
new TestOutputTopic class, but maybe that's sufficient. We can just tell people 
to always use TestOutputTopic if their topology has dynamic routing?

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, test-framework
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-12 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10395:

Labels: needs-kip test-framework  (was: test-framework)

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, test-framework
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-12 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10395:

Labels: test-framework  (was: )

> TopologyTestDriver does not work with dynamic topic routing
> ---
>
> Key: KAFKA-10395
> URL: https://issues.apache.org/jira/browse/KAFKA-10395
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: test-framework
>
> The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
> checks 
>  
> {code:java}
> final Queue> outputRecords = 
> outputRecordsByTopic.get(topicName);
> if (outputRecords == null) {
> if (!processorTopology.sinkTopics().contains(topicName)) {
> throw new IllegalArgumentException("Unknown topic: " + topicName); 
> } 
> }
> {code}
> The outputRecordsByTopic map keeps track of all topics that are actually 
> produced to, but obviously doesn't capture any topics that haven't yet 
> received output. The `processorTopology#sinkTopics` is supposed to account 
> for that by checking to make sure the topic is actually registered in the 
> topology, and throw an exception if not in case the user supplied the wrong 
> topic name to read from. 
> Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
> any topic, so the topology isn't aware of all the possible output topics. If 
> trying to read from one of these topics that happens to not have received any 
> output yet, the test will throw the above misleading IllegalArgumentException.
> We could just relax this check, but warning users who may actually have 
> accidentally passed in the wrong topic to read from seems quite useful. A 
> better solution would be to require registering all possible output topics to 
> the TTD up front. This would obviously require a KIP, but it would be a very 
> small one and shouldn't be too much trouble
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10395) TopologyTestDriver does not work with dynamic topic routing

2020-08-12 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10395:
---

 Summary: TopologyTestDriver does not work with dynamic topic 
routing
 Key: KAFKA-10395
 URL: https://issues.apache.org/jira/browse/KAFKA-10395
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman


The TopologyTestDriver#read(topic) methods all call #getRecordsQueue which 
checks 

 
{code:java}
final Queue> outputRecords = 
outputRecordsByTopic.get(topicName);
if (outputRecords == null) {
if (!processorTopology.sinkTopics().contains(topicName)) {
throw new IllegalArgumentException("Unknown topic: " + topicName); 
} 
}
{code}
The outputRecordsByTopic map keeps track of all topics that are actually 
produced to, but obviously doesn't capture any topics that haven't yet received 
output. The `processorTopology#sinkTopics` is supposed to account for that by 
checking to make sure the topic is actually registered in the topology, and 
throw an exception if not in case the user supplied the wrong topic name to 
read from. 

Unfortunately the TopicNameExtractor allows for dynamic routing of records to 
any topic, so the topology isn't aware of all the possible output topics. If 
trying to read from one of these topics that happens to not have received any 
output yet, the test will throw the above misleading IllegalArgumentException.

We could just relax this check, but warning users who may actually have 
accidentally passed in the wrong topic to read from seems quite useful. A 
better solution would be to require registering all possible output topics to 
the TTD up front. This would obviously require a KIP, but it would be a very 
small one and shouldn't be too much trouble

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] badaiaqrandista commented on pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-08-12 Thread GitBox


badaiaqrandista commented on pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#issuecomment-673156754


   @dajac Can you please re-review the changes I've made?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9910) Implement new transaction timed out error

2020-08-12 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao reassigned KAFKA-9910:
--

Assignee: HaiyuanZhao

> Implement new transaction timed out error
> -
>
> Key: KAFKA-9910
> URL: https://issues.apache.org/jira/browse/KAFKA-9910
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: HaiyuanZhao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on a change in pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…

2020-08-12 Thread GitBox


bbejeck commented on a change in pull request #9108:
URL: https://github.com/apache/kafka/pull/9108#discussion_r469593164



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
##
@@ -123,25 +117,18 @@
 public static void setupConfigsAndUtils() {
 
 STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());

Review comment:
   @albert02lowis I've confirmed locally that the test failures are 
related.  The `TopologyTestDriver` still needs a bootstrap servers config value.
   Adding something like 
`STREAMS_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "test:");` 
into the `AbstractJoinIntegrationTest.setupConfigsAndUtils()` gets all tests 
passing. 
   
   I'm not sure how the JDK 14 tests passed as locally I got the same errors as 
the JDK 8 and 11 build locally.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…

2020-08-12 Thread GitBox


bbejeck commented on a change in pull request #9108:
URL: https://github.com/apache/kafka/pull/9108#discussion_r469593164



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
##
@@ -123,25 +117,18 @@
 public static void setupConfigsAndUtils() {
 
 STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());

Review comment:
   @albert02lowis I've confirmed locally that the test failures are 
related.  The `TopologyTestDriver` still needs a bootstrap servers config value.
   Adding something like 
`STREAMS_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "test:");` 
into the `AbstractJoinIntegrationTest.setupConfigsAndUtils()` gets all tests 
passing. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8834: MINOR: Do not disable heartbeat during Rebalance

2020-08-12 Thread GitBox


ableegoldman commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r469593402



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -1069,6 +1069,13 @@ private HeartbeatResponseHandler(final Generation 
generation) {
 public void handle(HeartbeatResponse heartbeatResponse, 
RequestFuture future) {
 sensors.heartbeatSensor.record(response.requestLatencyMs());
 Errors error = heartbeatResponse.error();
+
+if (state != MemberState.STABLE) {

Review comment:
   I agree with @vvcephei , we should aim to keep this fix as simple as 
possible. The reason for this PR is that members are getting kicked from the 
group unnecessarily, not that members who are actually supposed to be fenced 
don't rejoin fast enough.
   If we fix KAFKA-10122 and people start complaining that members fenced 
during a rebalance take too long to find out, then we can revisit this 
optimization. But if KAFKA-10122 is fixed then it seems like the occurrence of 
members dropping out during a rebalance will go way down anyway, so we'd be 
solving a vanishingly small problem





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-08-12 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10284:

Priority: Critical  (was: Major)

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Critical
>  Labels: help-wanted
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-08-12 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-10284:
---

Assignee: Boyang Chen

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: help-wanted
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-08-12 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176639#comment-17176639
 ] 

Boyang Chen edited comment on KAFKA-10284 at 8/12/20, 10:48 PM:


I would say for all. Let me check if [~feyman] is interested in picking this up.


was (Author: bchen225242):
I would say for all.

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-08-12 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10284:

Fix Version/s: (was: 2.6.1)

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-08-12 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176639#comment-17176639
 ] 

Boyang Chen commented on KAFKA-10284:
-

I would say for all.

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Priority: Major
>  Labels: help-wanted
> Fix For: 2.6.1
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9066) Kafka Connect JMX : source & sink task metrics missing for tasks in failed state

2020-08-12 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176636#comment-17176636
 ] 

Randall Hauch commented on KAFKA-9066:
--

Merged https://github.com/apache/kafka/pull/8854 to the 2.5 branch (see 
KAFKA-10146) and updated the FIX VERSIONS above.

> Kafka Connect JMX : source & sink task metrics missing for tasks in failed 
> state
> 
>
> Key: KAFKA-9066
> URL: https://issues.apache.org/jira/browse/KAFKA-9066
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.1.1
>Reporter: Mikołaj Stefaniak
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.7.0, 2.5.2
>
>
> h2. Overview
> Kafka Connect exposes various metrics via JMX. Those metrics can be exported 
> i.e. by _Prometheus JMX Exporter_ for further processing.
> One of crucial attributes is connector's *task status.*
> According to official Kafka docs, status is available as +status+ attribute 
> of following MBean:
> {quote}kafka.connect:type=connector-task-metrics,connector="\{connector}",task="\{task}"status
>  - The status of the connector task. One of 'unassigned', 'running', 
> 'paused', 'failed', or 'destroyed'.
> {quote}
> h2. Issue
> Generally +connector-task-metrics+ are exposed propery for tasks in +running+ 
> status but not exposed at all if task is +failed+.
> Failed Task *appears* properly with failed status when queried via *REST API*:
>  
> {code:java}
> $ curl -X GET -u 'user:pass' 
> http://kafka-connect.mydomain.com/connectors/customerconnector/status
> {"name":"customerconnector","connector":{"state":"RUNNING","worker_id":"kafka-connect.mydomain.com:8080"},"tasks":[{"id":0,"state":"FAILED","worker_id":"kafka-connect.mydomain.com:8080","trace":"org.apache.kafka.connect.errors.ConnectException:
>  Received DML 'DELETE FROM mysql.rds_sysinfo .."}],"type":"source"}
> $ {code}
>  
> Failed Task *doesn't appear* as bean with +connector-task-metrics+ type when 
> queried via *JMX*:
>  
> {code:java}
> $ echo "beans -d kafka.connect" | java -jar 
> target/jmxterm-1.1.0-SNAPSHOT-uber.jar -l localhost:8081 -n -v silent | grep 
> connector=customerconnector
> kafka.connect:connector=customerconnector,task=0,type=task-error-metricskafka.connect:connector=customerconnector,type=connector-metrics
> $
> {code}
> h2. Expected result
> It is expected, that bean with +connector-task-metrics+ type will appear also 
> for tasks that failed.
> Below is example of how beans are properly registered for tasks in Running 
> state:
>  
> {code:java}
> $ echo "get -b 
> kafka.connect:connector=sinkConsentSubscription-1000,task=0,type=connector-task-metrics
>  status" | java -jar target/jmxterm-1.1.0-SNAPSHOT-ube r.jar -l 
> localhost:8081 -n -v silent
> status = running;
> $
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8834: MINOR: Do not disable heartbeat during Rebalance

2020-08-12 Thread GitBox


ableegoldman commented on a change in pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#discussion_r469587219



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) {
 }
 
 private void recordRebalanceFailure() {
-state = MemberState.UNJOINED;

Review comment:
   I saw this comment too, how is it that the heartbeat can interfere with 
the join group? Also, don't we need to remove the `disableHeartbeatThread` call 
that Boyang pointed to as well? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch merged pull request #8854: KAFKA-10146, KAFKA-9066: Retain metrics for failed tasks (backport to 2.5)

2020-08-12 Thread GitBox


rhauch merged pull request #8854:
URL: https://github.com/apache/kafka/pull/8854


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9066) Kafka Connect JMX : source & sink task metrics missing for tasks in failed state

2020-08-12 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9066:
-
Fix Version/s: 2.5.2

> Kafka Connect JMX : source & sink task metrics missing for tasks in failed 
> state
> 
>
> Key: KAFKA-9066
> URL: https://issues.apache.org/jira/browse/KAFKA-9066
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.1.1
>Reporter: Mikołaj Stefaniak
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.7.0, 2.5.2
>
>
> h2. Overview
> Kafka Connect exposes various metrics via JMX. Those metrics can be exported 
> i.e. by _Prometheus JMX Exporter_ for further processing.
> One of crucial attributes is connector's *task status.*
> According to official Kafka docs, status is available as +status+ attribute 
> of following MBean:
> {quote}kafka.connect:type=connector-task-metrics,connector="\{connector}",task="\{task}"status
>  - The status of the connector task. One of 'unassigned', 'running', 
> 'paused', 'failed', or 'destroyed'.
> {quote}
> h2. Issue
> Generally +connector-task-metrics+ are exposed propery for tasks in +running+ 
> status but not exposed at all if task is +failed+.
> Failed Task *appears* properly with failed status when queried via *REST API*:
>  
> {code:java}
> $ curl -X GET -u 'user:pass' 
> http://kafka-connect.mydomain.com/connectors/customerconnector/status
> {"name":"customerconnector","connector":{"state":"RUNNING","worker_id":"kafka-connect.mydomain.com:8080"},"tasks":[{"id":0,"state":"FAILED","worker_id":"kafka-connect.mydomain.com:8080","trace":"org.apache.kafka.connect.errors.ConnectException:
>  Received DML 'DELETE FROM mysql.rds_sysinfo .."}],"type":"source"}
> $ {code}
>  
> Failed Task *doesn't appear* as bean with +connector-task-metrics+ type when 
> queried via *JMX*:
>  
> {code:java}
> $ echo "beans -d kafka.connect" | java -jar 
> target/jmxterm-1.1.0-SNAPSHOT-uber.jar -l localhost:8081 -n -v silent | grep 
> connector=customerconnector
> kafka.connect:connector=customerconnector,task=0,type=task-error-metricskafka.connect:connector=customerconnector,type=connector-metrics
> $
> {code}
> h2. Expected result
> It is expected, that bean with +connector-task-metrics+ type will appear also 
> for tasks that failed.
> Below is example of how beans are properly registered for tasks in Running 
> state:
>  
> {code:java}
> $ echo "get -b 
> kafka.connect:connector=sinkConsentSubscription-1000,task=0,type=connector-task-metrics
>  status" | java -jar target/jmxterm-1.1.0-SNAPSHOT-ube r.jar -l 
> localhost:8081 -n -v silent
> status = running;
> $
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] svudutala commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2020-08-12 Thread GitBox


svudutala commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-673145549


   @dongjinleekr What is timeline for merging this PR and making this  upgrade 
available?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #9172: KAFKA-10387: Fix inclusion of transformation configs when topic creation is enabled in Connect

2020-08-12 Thread GitBox


rhauch commented on a change in pull request #9172:
URL: https://github.com/apache/kafka/pull/9172#discussion_r469586764



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
##
@@ -318,4 +323,61 @@ public void testFilterOnHasHeaderKeyWithSourceConnector() 
throws Exception {
 // delete connector
 connect.deleteConnector(CONNECTOR_NAME);
 }
+
+/**
+ * Test the {@link Filter} transformer with a
+ * {@link HasHeaderKey} predicate on a source connector.
+ */
+@Test
+public void testFilterOnHasHeaderKeyWithSourceConnectorAndTopicCreation() 
throws Exception {

Review comment:
   Sounds good. I just want to avoid someone trying to simplify the tests 
in the future without understanding that this test is verifying both features 
work together.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #9168: MINOR: Ensure same version of scala library is used for compile and at runtime

2020-08-12 Thread GitBox


rajinisivaram merged pull request #9168:
URL: https://github.com/apache/kafka/pull/9168


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9168: MINOR: Ensure same version of scala library is used for compile and at runtime

2020-08-12 Thread GitBox


rajinisivaram commented on pull request #9168:
URL: https://github.com/apache/kafka/pull/9168#issuecomment-673144455


   @ijuma Thanks for the review, streams test failures not related. Merging to 
trunk, 2.6, 2,5 and 2.4.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9173: KAFKA-10122: Consumer should allow heartbeat during rebalance

2020-08-12 Thread GitBox


ableegoldman commented on pull request #9173:
URL: https://github.com/apache/kafka/pull/9173#issuecomment-673143739


   Closing this since I didn't see the existing PR for this bugfix: 
https://github.com/apache/kafka/pull/8834



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman closed pull request #9173: KAFKA-10122: Consumer should allow heartbeat during rebalance

2020-08-12 Thread GitBox


ableegoldman closed pull request #9173:
URL: https://github.com/apache/kafka/pull/9173


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-08-12 Thread GitBox


ableegoldman commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r469582284



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
##
@@ -289,6 +294,25 @@ public static Sensor 
processorAtSourceSensorOrForwardSensor(final String threadI
 return processAtSourceSensor(threadId, taskId, processorNodeId, 
streamsMetrics);
 }
 
+public static Sensor e2ELatencySensor(final String threadId,
+  final String taskId,
+  final String processorNodeId,
+  final StreamsMetricsImpl 
streamsMetrics) {
+final String sensorName = processorNodeId + "-" + RECORD_E2E_LATENCY;
+final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, 
sensorName, RecordingLevel.INFO);

Review comment:
   There's a slight hiccup with moving the INFO metrics from task to node 
level:
   We get the current sensor from `StreamsMetrics#taskLevelSensor` which 
computes the `fullSensorName` with the `#taskSensorPrefix`
   If we instead use `StreamsMetrics#nodeLevelSensor` then the` fullSensorName` 
is constructed from the `#nodeSensorPrefix`, which is obviously different. So 
moving this to a “true” node level sensor would be a breaking change, IIUC
   I think the best we can do is just move this from TaskMetrics to 
ProcessorNodeMetrics, but still leave it as a taskLevelSensor. Let me know if 
I'm missing something though cc @guozhangwang @cadonna 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-08-12 Thread GitBox


ableegoldman commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r469582284



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
##
@@ -289,6 +294,25 @@ public static Sensor 
processorAtSourceSensorOrForwardSensor(final String threadI
 return processAtSourceSensor(threadId, taskId, processorNodeId, 
streamsMetrics);
 }
 
+public static Sensor e2ELatencySensor(final String threadId,
+  final String taskId,
+  final String processorNodeId,
+  final StreamsMetricsImpl 
streamsMetrics) {
+final String sensorName = processorNodeId + "-" + RECORD_E2E_LATENCY;
+final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, 
sensorName, RecordingLevel.INFO);

Review comment:
   There's a slight hiccup with moving the INFO metrics from task to node 
level:
   We get the current sensor from `StreamsMetrics#taskLevelSensor` which 
computes the `fullSensorName` with the `#taskSensorPrefix`
   If we instead use `StreamsMetrics#nodeLevelSensor` then the` fullSensorName` 
is constructed from the `#nodeSensorPrefix`, which is obviously different. So 
moving this to a “true” node level sensor would be a breaking change, IIUC
   I think the best we can do is just move this from TaskMetrics to 
ProcessorNodeMetrics, but still leave it as a taskLevelSensor. Let me know if 
I'm missing something though





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #9172: KAFKA-10387: Fix inclusion of transformation configs when topic creation is enabled in Connect

2020-08-12 Thread GitBox


kkonstantine commented on a change in pull request #9172:
URL: https://github.com/apache/kafka/pull/9172#discussion_r469578195



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
##
@@ -318,4 +323,61 @@ public void testFilterOnHasHeaderKeyWithSourceConnector() 
throws Exception {
 // delete connector
 connect.deleteConnector(CONNECTOR_NAME);
 }
+
+/**
+ * Test the {@link Filter} transformer with a
+ * {@link HasHeaderKey} predicate on a source connector.
+ */
+@Test
+public void testFilterOnHasHeaderKeyWithSourceConnectorAndTopicCreation() 
throws Exception {

Review comment:
   Since we have a few other tests replacing should be fine. 
   
   I'll add comments. But also, the test will fail if we don't create it (I 
removed the admin command) because we disable auto topic creation at the 
broker. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #9172: KAFKA-10387: Fix inclusion of transformation configs when topic creation is enabled in Connect

2020-08-12 Thread GitBox


kkonstantine commented on a change in pull request #9172:
URL: https://github.com/apache/kafka/pull/9172#discussion_r469577371



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##
@@ -129,7 +129,8 @@ public SourceConnectorConfig(Plugins plugins, Map props, boolean
 propsWithoutRegexForDefaultGroup.entrySet()
 .removeIf(e -> 
e.getKey().equals(DEFAULT_TOPIC_CREATION_PREFIX + INCLUDE_REGEX_CONFIG)
 || e.getKey().equals(DEFAULT_TOPIC_CREATION_PREFIX 
+ EXCLUDE_REGEX_CONFIG));
-enrichedSourceConfig = new 
EnrichedSourceConnectorConfig(enrich(defaultConfigDef, props,
+enrichedSourceConfig = new EnrichedSourceConnectorConfig(plugins,
+enrich(defaultConfigDef, props,
 defaultGroup), propsWithoutRegexForDefaultGroup);

Review comment:
   Good catch. The break was added by accident there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #9172: KAFKA-10387: Fix inclusion of transformation configs when topic creation is enabled in Connect

2020-08-12 Thread GitBox


kkonstantine commented on pull request #9172:
URL: https://github.com/apache/kafka/pull/9172#issuecomment-673136551


   According to https://status.apache.org the jenkins queue has grown too much 
and the infra team is investigating how to resolve the issues. Thus, the tests 
haven't run yet.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #9172: KAFKA-10387: Fix inclusion of transformation configs when topic creation is enabled in Connect

2020-08-12 Thread GitBox


rhauch commented on a change in pull request #9172:
URL: https://github.com/apache/kafka/pull/9172#discussion_r469574130



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java
##
@@ -318,4 +323,61 @@ public void testFilterOnHasHeaderKeyWithSourceConnector() 
throws Exception {
 // delete connector
 connect.deleteConnector(CONNECTOR_NAME);
 }
+
+/**
+ * Test the {@link Filter} transformer with a
+ * {@link HasHeaderKey} predicate on a source connector.
+ */
+@Test
+public void testFilterOnHasHeaderKeyWithSourceConnectorAndTopicCreation() 
throws Exception {

Review comment:
   Do we need another test here, or could we essentially replace the 
previous test with this one?
   
   Either way, the test that uses topic creation should probably have a comment 
(either in the method JavaDoc or in a line comment below) that explicitly 
mentions enabling topic creation. We don't want someone removing the topic 
creation configs at a later time.
   
   WDYT?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##
@@ -129,7 +129,8 @@ public SourceConnectorConfig(Plugins plugins, Map props, boolean
 propsWithoutRegexForDefaultGroup.entrySet()
 .removeIf(e -> 
e.getKey().equals(DEFAULT_TOPIC_CREATION_PREFIX + INCLUDE_REGEX_CONFIG)
 || e.getKey().equals(DEFAULT_TOPIC_CREATION_PREFIX 
+ EXCLUDE_REGEX_CONFIG));
-enrichedSourceConfig = new 
EnrichedSourceConnectorConfig(enrich(defaultConfigDef, props,
+enrichedSourceConfig = new EnrichedSourceConnectorConfig(plugins,
+enrich(defaultConfigDef, props,
 defaultGroup), propsWithoutRegexForDefaultGroup);

Review comment:
   Super tiny nit: the wrapping even from the original was confusing, and 
maybe it's worth changing one more line and cleaning up the wrapping:
   ```suggestion
   enrichedSourceConfig = new EnrichedSourceConnectorConfig(plugins,
   enrich(defaultConfigDef, props, defaultGroup), 
   propsWithoutRegexForDefaultGroup);
   ```
   WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10387) Cannot include SMT configs with source connector that uses topic.creation.* properties

2020-08-12 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10387:
--
Priority: Critical  (was: Major)

> Cannot include SMT configs with source connector that uses topic.creation.* 
> properties
> --
>
> Key: KAFKA-10387
> URL: https://issues.apache.org/jira/browse/KAFKA-10387
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Arjun Satish
>Assignee: Konstantine Karantasis
>Priority: Critical
>
> Let's say we try to create a connector with the following config:
> {code:java}
> {
>   "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
>   "tasks.max": "1",
>   "database.hostname": "localhost",
>   "database.port": 32843,
>   "database.user": "test",
>   "database.password": "test",
>   "database.dbname": "test",
>   "database.server.name": "tcpsql",
>   "table.whitelist": "public.numerics",
>   "transforms": "abc",
>   "transforms.abc.type": "io.debezium.transforms.ExtractNewRecordState",
>   "topic.creation.default.partitions": "1",
>   "topic.creation.default.replication.factor": "1"
> }
> {code}
> this fails with the following error in the Connector worker:
> {code:java}
> [2020-08-11 02:47:05,908] ERROR Failed to start task deb-0 
> (org.apache.kafka.connect.runtime.Worker:560)
> org.apache.kafka.connect.errors.ConnectException: 
> org.apache.kafka.common.config.ConfigException: Unknown configuration 
> 'transforms.abc.type'
>   at 
> org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:296)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:605)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:555)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1251)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:127)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1266)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1262)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
>   at java.base/java.lang.Thread.run(Thread.java:832)
> Caused by: org.apache.kafka.common.config.ConfigException: Unknown 
> configuration 'transforms.abc.type'
>   at 
> org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:159)
>   at 
> org.apache.kafka.connect.runtime.SourceConnectorConfig$EnrichedSourceConnectorConfig.get(SourceConnectorConfig.java:57)
>   at 
> org.apache.kafka.connect.runtime.SourceConnectorConfig.get(SourceConnectorConfig.java:141)
>   at 
> org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:216)
>   at 
> org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:281)
>   ... 10 more
> {code}
> connector creation works fine, if we remove the topic.creation properties 
> above. 
> Not entirely sure but it looks like the piece of code that might need a fix 
> is here (as it does not add {{transforms.*}} configs into the returned 
> ConfigDef instances: 
> https://github.com/apache/kafka/blob/2.6.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java#L94



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-12 Thread GitBox


d8tltanc commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-673114249


   @abbccdda Thanks for the review offers! I'll rebase this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-12 Thread GitBox


d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468736687



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -721,6 +730,11 @@ protected Node curNode() {
 return curNode;
 }
 
+final void incrementRetryBackoff(Call failedCall, long now) {

Review comment:
   For DescribeConsumerGroup and ListOffsets, AdminClient will construct a 
chain of requests. For example, DescribeConsumerGroup will send a 
FindCoordinatorRequest and then a DescribeGroupRequest (Denote them as F and 
D). 
   
   Let's consider this case:
   F (Error, tries = 0) -> F (OK, tries = 1) -> D (Error, tries = 1) -> F (OK, 
tries = 2) -> D (OK, tries = 2)
   
   F may come after either a failed F or failed D. It should increase the 
attempts based on either the attempts of F or D.
   D will come after a succeeded F. It should increase the attempts based on 
the attempts of F.
   
   Either F and D need to be aware of the number of attempts of the previously 
failed request to set their own attempts properly. Since F and D are separate 
objects and does not share any memory region, we can probably only to pass the 
failed call instance for constructing their status properly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei removed a comment on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-08-12 Thread GitBox


vvcephei removed a comment on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673100824


   @cadonna : 
   
   ```
   > Task :streams:checkstyleMain
   [ant:checkstyle] [ERROR] 
/home/confluent/kafka/streams/src/generated/java/org/apache/kafka/streams/InteractiveQueryPartitionResponse.java:1:
 Line does not match expected header line of '/*'. [Header]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-08-12 Thread GitBox


vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673100824


   @cadonna : 
   
   ```
   > Task :streams:checkstyleMain
   [ant:checkstyle] [ERROR] 
/home/confluent/kafka/streams/src/generated/java/org/apache/kafka/streams/InteractiveQueryPartitionResponse.java:1:
 Line does not match expected header line of '/*'. [Header]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

2020-08-12 Thread GitBox


guozhangwang commented on a change in pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#discussion_r469518409



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -58,6 +58,9 @@ static boolean checkpointNeeded(final boolean 
enforceCheckpoint,
 return false;
 }
 
+if (enforceCheckpoint)

Review comment:
   The null snapshot is for both active / standby tasks before the 
initializeIfNeeded is triggered, in that case, we should not overwrite the 
checkpoint even if it was enforced.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

2020-08-12 Thread GitBox


guozhangwang commented on a change in pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#discussion_r469517595



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -193,13 +193,17 @@ private void closeAndRevive(final Map> taskWith
 
 try {
 task.suspend();
+// we need to enforce a checkpoint that removes the corrupted 
partitions
+task.postCommit(true);

Review comment:
   Here since the task is guaranteed to be in SUSPENDED `postCommit()` is 
ensured to trigger checkpointing.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1609,12 +1609,11 @@ public void shouldReturnStateManagerChangelogOffsets() {
 }
 
 @Test
-public void shouldCheckpointWithCreatedStateOnClose() {
+public void shouldNotCheckpointOnCloseCreated() {

Review comment:
   It was the same as the other one, that it was wrong to begin with.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

2020-08-12 Thread GitBox


guozhangwang commented on pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#issuecomment-673090664


   > Do you have any theories as to why this was causing 
`StreamsUpgradeTest.test_app_upgrade` to be flaky? Do we really get that many 
TaskCorruptedExceptions in this system test?
   
   I observed several of the failed tests are due to the endless loop of the 
task-corruption. Basically we shutdown all applications and then restart them, 
and in between the log truncation may trigger and hence cause restoration hit 
out-of-range. And due to this bug we would fall in the loop forever.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-08-12 Thread GitBox


vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673090192


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure

2020-08-12 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176582#comment-17176582
 ] 

Guozhang Wang commented on KAFKA-10357:
---

I've thought about relaying on the committed offsets, but that is not 100% 
either since it is possible that the commit has not been sent, while some data 
has been sent to the repartition topics and hence lost due to topic deletion. I 
agree that KAFKA-3370 is not theoretically sound, but I think that is 
sufficient for the near term. For longer term solution I feel we'd have to push 
this to user's control (via {{#initialize}} for example).

> Handle accidental deletion of repartition-topics as exceptional failure
> ---
>
> Key: KAFKA-10357
> URL: https://issues.apache.org/jira/browse/KAFKA-10357
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> Repartition topics are both written by Stream's producer and read by Stream's 
> consumer, so when they are accidentally deleted both clients may be notified. 
> But in practice the consumer would react to it much quicker than producer 
> since the latter has a delivery timeout expiration period (see 
> https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to 
> it, it will re-join the group since metadata changed and during the triggered 
> rebalance it would auto-recreate the topic silently and continue, causing 
> data lost silently. 
> One idea, is to only create all repartition topics *once* in the first 
> rebalance and not auto-create them any more in future rebalances, instead it 
> would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code 
> (https://issues.apache.org/jira/browse/KAFKA-10355).
> The challenge part would be, how to determine if it is the first-ever 
> rebalance, and there are several wild ideas I'd like to throw out here:
> 1) change the thread state transition diagram so that STARTING state would 
> not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the 
> assign function we can check if the state is still in CREATED and not RUNNING.
> 2) augment the subscriptionInfo to encode whether or not this is the first 
> time ever rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-08-12 Thread GitBox


vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673090128


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-08-12 Thread GitBox


vvcephei commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-673090023


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-12 Thread GitBox


abbccdda commented on pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#issuecomment-673088220


   @d8tltanc Thanks for the great work! I could resume reviewing this PR 
sometime next week, do you mind rebasing it?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

2020-08-12 Thread GitBox


abbccdda commented on a change in pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#discussion_r469509680



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -193,13 +193,17 @@ private void closeAndRevive(final Map> taskWith
 
 try {
 task.suspend();
+// we need to enforce a checkpoint that removes the corrupted 
partitions
+task.postCommit(true);
 } catch (final RuntimeException swallow) {
 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
 }
 task.closeDirty();
+
+// For active tasks pause their input partitions so we won't poll 
any more records
+// for this task until it has been re-initialized;
+// Note, closeDirty already clears the partitiongroup for the task.

Review comment:
   partition group

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -193,13 +193,17 @@ private void closeAndRevive(final Map> taskWith
 
 try {
 task.suspend();
+// we need to enforce a checkpoint that removes the corrupted 
partitions
+task.postCommit(true);

Review comment:
   I'm not sure this is 100% ensuring the snapshot gets done, as we have 
branching logic in postCommit. Do you think we should just get a helper to 
cleanup instead?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -58,6 +58,9 @@ static boolean checkpointNeeded(final boolean 
enforceCheckpoint,
 return false;
 }
 
+if (enforceCheckpoint)

Review comment:
   Could we move this logic to the `StandbyTask` only? It is the only case 
I have seen which could have null snapshot passed in, which could make this 
helper assume both snapshots are not null.
   ```
   if (oldOffsetSnapshot == null) {
   return false;  
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #9173: KAFKA-10122: Consumer should allow heartbeat during rebalance

2020-08-12 Thread GitBox


ableegoldman opened a new pull request #9173:
URL: https://github.com/apache/kafka/pull/9173


   We launched a Streams application reading from a single 3000-partition topic 
and saw continuous rebalancing. Digging into the logs, every time the leader 
sent a SyncGroup request it would discover that it had dropped out of the group 
and needed to rejoin. The assignment seemed to take slightly longer than 10s, 
the session interval, so it seemed to be getting kicked due to heartbeat 
expiration while the heartbeat thread was disabled.
   
   I redeployed the app with this exact patch and saw it stabilize at last. The 
application was left running for ~20 hours or so and never rebalanced again 
after the last pod was rolled



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #9172: KAFKA-10387: Fix inclusion of transformation configs when topic creation is enabled in Connect

2020-08-12 Thread GitBox


kkonstantine commented on pull request #9172:
URL: https://github.com/apache/kafka/pull/9172#issuecomment-673067506


   cc @rhauch @wicknicks 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #9172: KAFKA-10387: Fix inclusion of transformation configs when topic creation is enabled in Connect

2020-08-12 Thread GitBox


kkonstantine commented on pull request #9172:
URL: https://github.com/apache/kafka/pull/9172#issuecomment-673065882


   I'll be adding a few more tests. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine opened a new pull request #9172: KAFKA-10387: Fix inclusion of transformation configs when topic creation is enabled in Connect

2020-08-12 Thread GitBox


kkonstantine opened a new pull request #9172:
URL: https://github.com/apache/kafka/pull/9172


   Addition of configs for custom topic creation with KIP-158 created a 
regression when transformation configs are also included in the configuration 
of a source connector. 
   
   To experience the issue, just enabling topic creation at the worker is not 
sufficient. A user needs to supply a source connector configuration that 
contains both transformations and custom topic creation properties. 
   
   The issue is that the enrichment of configs in `SourceConnectorConfig` 
happens on top of an `AbstractConfig` rather than a `ConnectorConfig`. 
Inheriting from the latter allows enrichment to be composable for both topic 
creation and transformations. 
   
   Unit tests and integration tests are written to test these combinations. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10393) Message for fetch snapshot and fetch

2020-08-12 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-10393:
---
Summary: Message for fetch snapshot and fetch  (was: KIP-630: message for 
fetch snapshot and fetch)

> Message for fetch snapshot and fetch
> 
>
> Key: KAFKA-10393
> URL: https://issues.apache.org/jira/browse/KAFKA-10393
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10394) Internal API for generating snapshots

2020-08-12 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-10394:
--

 Summary: Internal API for generating snapshots
 Key: KAFKA-10394
 URL: https://issues.apache.org/jira/browse/KAFKA-10394
 Project: Kafka
  Issue Type: Sub-task
  Components: replication
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469476195



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {

Review comment:
   Ok, switching Describe to **not** require that it be done on the 
controller.
   
   > planning on getting rid of ControllerNodeProvider as part of KIP-590 
   
   Leaving Alter alone for now under the assumption that we will fix it as part 
of the KIP-590 PR.  Let me know if you wish me to change this now instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10393) KIP-630: message for fetch snapshot and fetch

2020-08-12 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-10393:
---
Summary: KIP-630: message for fetch snapshot and fetch  (was: KIP-630: 
messafe for fetch snapshot and fetch)

> KIP-630: message for fetch snapshot and fetch
> -
>
> Key: KAFKA-10393
> URL: https://issues.apache.org/jira/browse/KAFKA-10393
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-12 Thread GitBox


d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468736687



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -721,6 +730,11 @@ protected Node curNode() {
 return curNode;
 }
 
+final void incrementRetryBackoff(Call failedCall, long now) {

Review comment:
   For DescribeConsumerGroup and ListOffsets, AdminClient will construct a 
chain of requests. For example, DescribeConsumerGroup will send a 
FindCoordinatorRequest and then a DescribeGroupRequest (Denote them as F and 
D). 
   
   Let's consider this case:
   F (Error, tries = 0) -> F (OK, tries = 1) -> D (Error, tries = 1) -> F (OK, 
tries = 2) -> D (OK, tries = 2)
   
   Either F and D need to be aware of the number of attempts of the previously 
failed request to set their own attempts properly. Since F and D are separate 
objects and does not share any memory region, we can probably only to pass the 
failed call instance for constructing their status properly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-12 Thread GitBox


d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r468736687



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -721,6 +730,11 @@ protected Node curNode() {
 return curNode;
 }
 
+final void incrementRetryBackoff(Call failedCall, long now) {

Review comment:
   For DescribeConsumerGroup and ListOffsets, AdminClient will construct a 
chain of requests. For example, DescribeConsumerGroup will send a 
FindCoordinatorRequest and then a DescribeGroupRequest (Denote them as F and 
D). 
   
   Let's consider this case:
   F (OK, tries = 0) -> D (Error, tries = 0) -> F (OK, tries = 1) -> D (Error, 
tries = 1) -> F (OK, tries = 2) -> D (OK, tries = 2)
   
   Either F and D need to be aware of the number of attempts of the previously 
failed request to set their own attempts properly. Since F and D are separate 
objects and does not share any memory region, we can probably only to pass the 
failed call instance for constructing their status properly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-12 Thread GitBox


d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r469473196



##
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##
@@ -74,22 +76,28 @@
 private final ClusterResourceListeners clusterResourceListeners;
 private boolean isClosed;
 private final Map lastSeenLeaderEpochs;
+private final ExponentialBackoff refreshBackoff;
+final static double RETRY_BACKOFF_JITTER = 0.2;

Review comment:
   Make sense. Moved them into `CommonClientConfigs` for reuse.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-12 Thread GitBox


d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r469472239



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;

Review comment:
   Resolved as this class has been merged.

##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;
+private final double jitter;
+
+public GeometricProgression(long scaleFactor, int ratio, long termMax, 
double jitter) {
+this.scaleFactor = scaleFactor;
+this.ratio = ratio;
+this.jitter = jitter;
+this.expMax = termMax > scaleFactor ?

Review comment:
   Resolved as this class has been merged in another PR.

##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {

Review comment:
   Resolved as this class has been merged in another PR.

##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java

[GitHub] [kafka] d8tltanc commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

2020-08-12 Thread GitBox


d8tltanc commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r469471723



##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided
+ * This class is thread-safe
+ */
+public class GeometricProgression {
+private final int ratio;
+private final double expMax;
+private final long scaleFactor;
+private final double jitter;
+
+public GeometricProgression(long scaleFactor, int ratio, long termMax, 
double jitter) {
+this.scaleFactor = scaleFactor;

Review comment:
   Resolved as this class has been merged in another PR.

##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...
+ * The formula is Term(n) = random(1 - jitter, 1 + jitter) * scaleFactor * 
(ratio) ^ n
+ * If scaleFactor is greater or equal than termMax, a constant term of will be 
provided

Review comment:
   Resolved as this class has been merged in another PR.

##
File path: 
clients/src/main/java/org/apache/kafka/common/utils/GeometricProgression.java
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * An util class for exponential backoff, backoff, etc...

Review comment:
   Resolved as this class has been merged in another PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] serjchebotarev commented on pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters

2020-08-12 Thread GitBox


serjchebotarev commented on pull request #9028:
URL: https://github.com/apache/kafka/pull/9028#issuecomment-673049249


   @ableegoldman alright, thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-12 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r469467881



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1210,28 +1218,20 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def expandIsr(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-val zkVersionOpt = stateStore.expandIsr(controllerEpoch, newLeaderAndIsr)
-maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
-  }
+  private def expandIsr(newInSyncReplica: Int): Unit = {
+pendingInSyncReplicaIds += newInSyncReplica
+info(s"Adding new in-sync replica $newInSyncReplica. Pending ISR updated 
to [${pendingInSyncReplicaIds.mkString(",")}]")
 
-  private[cluster] def shrinkIsr(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
-maybeUpdateIsrAndVersion(newIsr, zkVersionOpt)
+val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
pendingInSyncReplicaIds.toList, zkVersion)
+alterIsrChannelManager.enqueueIsrUpdate(AlterIsrItem(topicPartition, 
newLeaderAndIsr))
   }
 
-  private[cluster] def maybeUpdateIsrAndVersion(isr: Set[Int], zkVersionOpt: 
Option[Int]): Unit = {
-zkVersionOpt match {
-  case Some(newVersion) =>
-inSyncReplicaIds = isr
-zkVersion = newVersion
-info("ISR updated to [%s] and zkVersion updated to 
[%d]".format(isr.mkString(","), zkVersion))
+  private[cluster] def shrinkIsr(outOfSyncReplicas: Set[Int]): Unit = {
+pendingInSyncReplicaIds --= outOfSyncReplicas

Review comment:
   I'm currently looking at the effective ISR to find new out of sync 
replicas. This can include new ISR members which haven't made it into the 
"true" ISR via LeaderAndIsr yet (like broker=3 in your example). Maybe we 
should only consider removing ISR members iff they are in the true ISR. IOW 
changing from
   
   ```scala
   val candidateReplicaIds = effectiveInSyncReplicaIds - localBrokerId
   ```
   to
   ```scala
   val candidateReplicaIds = inSyncReplicaIds - localBrokerId
   ```
   
   Also, I wonder if the batching that's happening in AlterIsrChannelManager 
violates the model. It sends the request asynchronously with a small delay, so 
multiple ISR changes can be batched into one AlterIsr.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure

2020-08-12 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176535#comment-17176535
 ] 

Sophie Blee-Goldman commented on KAFKA-10357:
-

I think the elegant way to shutdown the whole application is pretty 
straightforward, for that we can just trigger a rebalance and encode an error 
(like we do for missing source topics,  but less silently). The rest, I'm not 
so sure. If we want to solve this "right away" then breaking compatibility 
isn't really an option; if it can wait for 3.0 then the 
"KafkaStreams#initialize" type solution is on the table.

The KAFKA-3370 idea is intriguing but also doesn't seem perfectly safe. Maybe 
we first need to decide if it's acceptable to solve this problem for only 99% 
of cases (or whatever number less than 100).

On the other hand, we just need some way to infer whether the app is new or not 
from some kind of persisted information. Can we leverage the committed offsets 
somehow? It seems like if the repartition topics don't exist but the group has 
committed offsets for them, then they must have been deleted

> Handle accidental deletion of repartition-topics as exceptional failure
> ---
>
> Key: KAFKA-10357
> URL: https://issues.apache.org/jira/browse/KAFKA-10357
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> Repartition topics are both written by Stream's producer and read by Stream's 
> consumer, so when they are accidentally deleted both clients may be notified. 
> But in practice the consumer would react to it much quicker than producer 
> since the latter has a delivery timeout expiration period (see 
> https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to 
> it, it will re-join the group since metadata changed and during the triggered 
> rebalance it would auto-recreate the topic silently and continue, causing 
> data lost silently. 
> One idea, is to only create all repartition topics *once* in the first 
> rebalance and not auto-create them any more in future rebalances, instead it 
> would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code 
> (https://issues.apache.org/jira/browse/KAFKA-10355).
> The challenge part would be, how to determine if it is the first-ever 
> rebalance, and there are several wild ideas I'd like to throw out here:
> 1) change the thread state transition diagram so that STARTING state would 
> not transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the 
> assign function we can check if the state is still in CREATED and not RUNNING.
> 2) augment the subscriptionInfo to encode whether or not this is the first 
> time ever rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-12 Thread GitBox


mumrah commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r469460462



##
File path: core/src/main/scala/kafka/server/AlterIsrChannelManager.scala
##
@@ -0,0 +1,132 @@
+package kafka.server
+
+import java.util
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import kafka.api.LeaderAndIsr
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.clients.ClientResponse
+import org.apache.kafka.common.TopicPartition
+import 
org.apache.kafka.common.message.AlterIsrRequestData.{AlterIsrRequestPartitions, 
AlterIsrRequestTopics}
+import org.apache.kafka.common.message.{AlterIsrRequestData, 
AlterIsrResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterIsrRequest, AlterIsrResponse}
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Handles the sending of AlterIsr requests to the controller. Updating the 
ISR is an asynchronous operation,
+ * so partitions will learn about updates through LeaderAndIsr messages sent 
from the controller
+ */
+trait AlterIsrChannelManager {
+  val IsrChangePropagationBlackOut = 5000L

Review comment:
   We don't, these were copied over from the ReplicaManager's ISR 
propagation logic. I'll clean this up





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-12 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r469457068



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {
+@Override
+public DescribeUserScramCredentialsRequest.Builder 
createRequest(int timeoutMs) {
+return new DescribeUserScramCredentialsRequest.Builder(
+new 
DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user ->
+new 
DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList(;
+}
+
+@Override
+public void handleResponse(AbstractResponse abstractResponse) {
+DescribeUserScramCredentialsResponse response = 
(DescribeUserScramCredentialsResponse) abstractResponse;
+Errors error = Errors.forCode(response.data().error());
+switch (error) {
+case NONE:
+DescribeUserScramCredentialsResponseData data = 
response.data();
+
future.complete(data.userScramCredentials().stream().collect(Collectors.toMap(
+
DescribeUserScramCredentialsResponseData.UserScramCredential::name,
+userScramCredential -> {
+List scramCredentialInfos 
= userScramCredential.credentialInfos().stream().map(
+credentialInfo -> new 
ScramCredentialInfo(ScramMechanism.from(credentialInfo.mechanism()), 
credentialInfo.iterations()))
+.collect(Collectors.toList());
+return new 
UserScramCredentialsDescription(userScramCredential.name(), 
scramCredentialInfos);
+})));
+break;
+case NOT_CONTROLLER:
+handleNotControllerError(error);
+break;
+default:
+future.completeExceptionally(new ApiError(error, 
response.data().errorMessage()).exception());
+break;
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+future.completeExceptionally(throwable);
+}
+};
+runnable.call(call, now);
+return new DescribeUserScramCredentialsResult(future);
+}
+
+@Override
+public AlterUserScramCredentialsResult 
alterUserScramCredentials(List alterations,
+ 
AlterUserScramCredentialsOptions options) {
+final long now = time.milliseconds();
+final Map> futures = new HashMap<>();
+for (UserScramCredentialAlteration alteration: alterations) {
+futures.put(alteration.getUser(), new KafkaFutureImpl<>());
+}
+final Map userIllegalAlterationExceptions = new 
HashMap<>();
+// We need to keep track of users with deletions of an unknown SCRAM 
mechanism
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialDeletion).forEach(alteration -> {
+UserScramCredentialDeletion deletion = 
(UserScramCredentialDeletion) alteration;
+ScramMechanism mechanism = deletion.getMechanism();
+if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) {
+userIllegalAlterationExceptions.put(deletion.getUser(), new 
IllegalArgumentException("Unknown SCRAM mechanism"));
+}
+});
+// Creating an upsertion may throw InvalidKeyException or 
NoSuchAlgorithmException,
+// so keep track of which users are affected by such a failure and 
immediately fail all their alterations
+final Map> userInsertions 
= new HashMap<>();
+alterations.stream().filter(a -> a instanceof 
UserScramCredentialUpsertion)
+.filter(alteration -> 
!userIllegalAlterationExceptions.containsKey(alteration.getUser()))
+.forEach(alteration -> {
+UserScramCredentialUpsertion upsertion = 
(UserScramCredentialUpsertion) alteration;
+String user = upsertion.getUser();
+try {
+Scr

[GitHub] [kafka] ableegoldman commented on pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters

2020-08-12 Thread GitBox


ableegoldman commented on pull request #9028:
URL: https://github.com/apache/kafka/pull/9028#issuecomment-673036155


   @serjchebotarev he's out of office this week, just fyi. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9170: KAFKA-10391: Overwrite checkpoint in task corruption to remove corrupted partitions

2020-08-12 Thread GitBox


ableegoldman commented on a change in pull request #9170:
URL: https://github.com/apache/kafka/pull/9170#discussion_r469450345



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1609,12 +1609,11 @@ public void shouldReturnStateManagerChangelogOffsets() {
 }
 
 @Test
-public void shouldCheckpointWithCreatedStateOnClose() {
+public void shouldNotCheckpointOnCloseCreated() {

Review comment:
   Why did this test change?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1629,16 +1628,18 @@ public void shouldCheckpointWithCreatedStateOnClose() {
 assertFalse(source1.initialized);
 assertFalse(source1.closed);
 
+EasyMock.verify(stateManager, recordCollector);
+
 final double expectedCloseTaskMetric = 1.0;
 verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
 }
 
 @Test
-public void shouldNotCheckpointOnCloseRestoringIfNoProgress() {
+public void shouldCheckpointOnCloseRestoringIfNoProgress() {

Review comment:
   Same with this one, why did it change? Or was the test just wrong to 
begin with..?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9100: Add AlterISR RPC and use it for ISR modifications

2020-08-12 Thread GitBox


hachikuji commented on a change in pull request #9100:
URL: https://github.com/apache/kafka/pull/9100#discussion_r469388095



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -748,7 +755,7 @@ class Partition(val topicPartition: TopicPartition,
 leaderLogIfLocal match {
   case Some(leaderLog) =>
 // keep the current immutable replica list reference
-val curInSyncReplicaIds = inSyncReplicaIds
+val curInSyncReplicaIds = effectiveInSyncReplicaIds

Review comment:
   Related to the other comment, but we need to be careful with the min.isr 
check below. I think it is correct to wait for `effectiveInSyncReplicaIds` 
before acknowledging the produce request, but we should probably use the size 
of `inSyncReplicaIds` in the min.isr check since that is the only set we can 
guarantee.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterIsrRequest.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public class AlterIsrRequest extends AbstractRequest {
+
+private final AlterIsrRequestData data;
+
+public AlterIsrRequest(AlterIsrRequestData data, short apiVersion) {
+super(ApiKeys.ALTER_ISR, apiVersion);
+this.data = data;
+}
+
+public AlterIsrRequestData data() {
+return data;
+}
+
+@Override
+protected Struct toStruct() {
+return data.toStruct(version());
+}
+
+/**
+ * Get an error response for a request with specified throttle time in the 
response if applicable
+ *
+ * @param throttleTimeMs

Review comment:
   nit: maybe drop the parameters if they do not need to be documented

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -255,6 +255,10 @@ class Partition(val topicPartition: TopicPartition,
 
   def isAddingReplica(replicaId: Int): Boolean = 
assignmentState.isAddingReplica(replicaId)
 
+  // For advancing the HW we assume the largest ISR even if the controller 
hasn't made the change yet
+  // This set includes the latest ISR (as we learned from LeaderAndIsr) and 
any replicas from a pending ISR expansion
+  def effectiveInSyncReplicaIds: Set[Int] = inSyncReplicaIds | 
pendingInSyncReplicaIds

Review comment:
   We might need to be careful about performance here since this would get 
called on every follower fetch.

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -485,13 +490,11 @@ class Partition(val topicPartition: TopicPartition,
   def makeLeader(partitionState: LeaderAndIsrPartitionState,

Review comment:
   There is a "classic" edge case in Kafka which goes as follows:
   
   1. Leader is 1, ISR is [1, 2, 3]
   2. Broker 3 begins controlled shutdown. While awaiting shutdown, it 
continues fetching.
   3. Controller bumps epoch and shrinks ISR to [1, 2] and notifies replicas
   4. Before controlled shutdown completes and 3 stops fetching, the leader 
adds it back to the ISR.
   
   This bug was fixed by KIP-320 which added epoch validation to the Fetch API. 
After shrinking the ISR in step 3, the controller will send `LeaderAndIsr` with 
the updated epoch to [1, 2] and `StopReplica` to [3]. So 3 will not send any 
fetches with the updated epoch, which means it's impossible for the leader to 
add 3 back after observing the shrink to [1, 2]. 
   
   I just want to make sure whether above is correct and whether `AlterIsr` 
changes it in any way. I think the answer is no as long as ISR expansion is 
_only_ done in response to a fetch request, but it's worth double-checking.

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1756,6 +1761,141 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  def alterIsrs(alterIsrRequest: AlterIsrRequestData, callback: 
AlterIsrResponseData => 

[jira] [Created] (KAFKA-10393) KIP-630: messafe for fetch snapshot and fetch

2020-08-12 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-10393:
--

 Summary: KIP-630: messafe for fetch snapshot and fetch
 Key: KAFKA-10393
 URL: https://issues.apache.org/jira/browse/KAFKA-10393
 Project: Kafka
  Issue Type: Sub-task
  Components: replication
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang merged pull request #9171: MINOR: add ableegoldman and cadonna to whitelist

2020-08-12 Thread GitBox


guozhangwang merged pull request #9171:
URL: https://github.com/apache/kafka/pull/9171


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9171: MINOR: add ableegoldman and cadonna to whitelist

2020-08-12 Thread GitBox


ableegoldman commented on pull request #9171:
URL: https://github.com/apache/kafka/pull/9171#issuecomment-673010059


   cc @vvcephei @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #9171: MINOR: add ableegoldman and cadonna to whitelist

2020-08-12 Thread GitBox


ableegoldman opened a new pull request #9171:
URL: https://github.com/apache/kafka/pull/9171


   So we don't have to bug the committers to kick off tests 😄 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >