[jira] [Assigned] (KAFKA-10378) issue when create producer using java

2020-08-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-10378:
-

Assignee: Luke Chen

> issue when create producer using java
> -
>
> Key: KAFKA-10378
> URL: https://issues.apache.org/jira/browse/KAFKA-10378
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0
> Environment: mac os
> java version "1.8.0_231"
> intellij 
>Reporter: Mohammad Abdelqader
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 2.7.0, 2.6.1
>
>
> I created simple producer using java by Intellij studio. When i run project , 
> it return following issue
> [kafka-producer-network-thread | producer-1] ERROR 
> org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread 
> 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | 
> producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught 
> exception in thread 'kafka-producer-network-thread | 
> producer-1':java.lang.NoClassDefFoundError: 
> com/fasterxml/jackson/databind/JsonNode at 
> org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36)
>  at 
> org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at 
> java.lang.Thread.run(Thread.java:748)Caused by: 
> java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:382) at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:418) at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code

2020-08-10 Thread GitBox


guozhangwang commented on a change in pull request #8549:
URL: https://github.com/apache/kafka/pull/8549#discussion_r468327920



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##
@@ -1366,6 +1366,10 @@ public void handleResponse(AbstractResponse response) {
 } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
 error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
 fatalError(error.exception());
+} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == 
Errors.PRODUCER_FENCED) {
+// We could still receive INVALID_PRODUCER_EPOCH from 
transaction coordinator,

Review comment:
   Good catch.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##
@@ -1417,8 +1421,10 @@ public void handleResponse(AbstractResponse response) {
 } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || 
error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
 reenqueue();
 return;
-} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-fatalError(error.exception());
+} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == 
Errors.PRODUCER_FENCED) {

Review comment:
   nit: ".. old versioned transaction coordinator", ditto below.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
##
@@ -1571,6 +1637,54 @@ public void testProducerFencedException() throws 
InterruptedException {
 Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
 }
 
+@Test
+public void testInvalidProducerEpochConvertToProducerFencedInEndTxn() 
throws InterruptedException {
+doInitTransactions();
+
+transactionManager.beginTransaction();
+transactionManager.failIfNotReadyForSend();
+transactionManager.maybeAddPartitionToTransaction(tp0);
+TransactionalRequestResult commitResult = 
transactionManager.beginCommit();
+
+Future responseFuture = appendToAccumulator(tp0);
+
+assertFalse(responseFuture.isDone());
+prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+prepareProduceResponse(Errors.NONE, producerId, epoch);
+prepareEndTxnResponse(Errors.INVALID_PRODUCER_EPOCH, 
TransactionResult.COMMIT, producerId, epoch);
+
+runUntil(commitResult::isCompleted);
+runUntil(responseFuture::isDone);
+
+// make sure the exception was thrown directly from the follow-up 
calls.
+assertThrows(KafkaException.class, () -> 
transactionManager.beginTransaction());
+assertThrows(KafkaException.class, () -> 
transactionManager.beginCommit());
+assertThrows(KafkaException.class, () -> 
transactionManager.beginAbort());
+assertThrows(KafkaException.class, () -> 
transactionManager.sendOffsetsToTransaction(
+Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
+}
+
+@Test
+public void testInvalidProducerEpochFromProduce() throws 
InterruptedException {
+doInitTransactions();
+
+transactionManager.beginTransaction();
+transactionManager.failIfNotReadyForSend();
+transactionManager.maybeAddPartitionToTransaction(tp0);
+
+Future responseFuture = appendToAccumulator(tp0);
+
+assertFalse(responseFuture.isDone());
+prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, producerId, 
epoch);
+prepareProduceResponse(Errors.NONE, producerId, epoch);
+
+sender.runOnce();
+
+runUntil(responseFuture::isDone);
+assertFalse(transactionManager.hasError());

Review comment:
   Thanks for adding the coverage!

##
File path: 
clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more

Review comment:
   Why not put this internal exception in 
`org.apache.kafka.clients.producer.internals`?

##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##
@@ -539,7 +539,7 @@ class TransactionCoordinator(brokerId: Int,
   s"${txnIdAndPidEpoch.transactionalId} due to timeout")
 
   case error@(Errors.INVALID_PRODUCER_ID_MAPPING |
-  Errors.INVALID_PRODUCER_EPOCH |

Review comment:
   In the new broker version, when do we still return 
`INVALID_PRODUCER_EPOCH` then? Or would we never return it any more?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and 

[GitHub] [kafka] stanislavkozlovski commented on pull request #9155: MINOR: Ensure a single version of scala-library is used

2020-08-10 Thread GitBox


stanislavkozlovski commented on pull request #9155:
URL: https://github.com/apache/kafka/pull/9155#issuecomment-671714884


   I just updated the PR description to mention that both tests were using 
Jackson `2.10.5`. Apologies for the misleading, I must have got confused by the 
numerous tests I did before finding the issue.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9158: MINOR: Update the quickstart link in readme

2020-08-10 Thread GitBox


showuon commented on pull request #9158:
URL: https://github.com/apache/kafka/pull/9158#issuecomment-671702480


   @rhauch , could you review this small PR to update quickstart link? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9158: MINOR: Update the quickstart link in readme

2020-08-10 Thread GitBox


showuon opened a new pull request #9158:
URL: https://github.com/apache/kafka/pull/9158


   After the new website launched, the quickstart link also changed. Update the 
quickstart link in readme.md.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-10 Thread GitBox


ableegoldman commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r468218540



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+public class SlidingWindowedCogroupedKStreamImpl extends 
AbstractStream implements TimeWindowedCogroupedKStream {
+private final SlidingWindows windows;
+private final CogroupedStreamAggregateBuilder aggregateBuilder;
+private final Map, Aggregator> groupPatterns;
+
+SlidingWindowedCogroupedKStreamImpl(final SlidingWindows windows,
+final InternalStreamsBuilder builder,
+final Set 
subTopologySourceNodes,
+final String name,
+final 
CogroupedStreamAggregateBuilder aggregateBuilder,
+final StreamsGraphNode 
streamsGraphNode,
+final Map, 
Aggregator> groupPatterns) {
+super(name, null, null, subTopologySourceNodes, streamsGraphNode, 
builder);
+//keySerde and valueSerde are null because there are many different 
groupStreams that they could be from
+this.windows = windows;
+this.aggregateBuilder = aggregateBuilder;
+this.groupPatterns = groupPatterns;
+}
+
+@Override
+public KTable, V> aggregate(final Initializer initializer) {
+return aggregate(initializer, Materialized.with(null, null));
+}
+
+@Override
+public KTable, V> aggregate(final Initializer initializer,
+final Materialized> materialized) {
+return aggregate(initializer, NamedInternal.empty(), materialized);
+}
+
+@Override
+public KTable, V> aggregate(final Initializer initializer,
+final Named named) {
+return aggregate(initializer, named, Materialized.with(null, null));
+}
+
+@Override
+public KTable, V> aggregate(final Initializer initializer,
+final Named named,
+final Materialized> materialized) {
+Objects.requireNonNull(initializer, "initializer can't be null");
+Objects.requireNonNull(named, "named can't be null");
+Objects.requireNonNull(materialized, "materialized can't be null");
+final MaterializedInternal> 
materializedInternal = new MaterializedInternal<>(
+materialized,
+builder,
+CogroupedKStreamImpl.AGGREGATE_NAME);
+return aggregateBuilder.build(
+groupPatterns,
+initializer,
+new NamedInternal(named),
+materialize(materializedInternal),
+materializedInternal.keySerde() != null ?
+new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs())
+: null,
+

[GitHub] [kafka] apovzner commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

2020-08-10 Thread GitBox


apovzner commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-671684203


   @rajinisivaram Thanks for your review.  I updated this PR to expose  
'connection-accept-rate'  metrics, addressed the remaining comments, and the 
test should also be fixed.
   
   It seems like it would also make sense to add and expose throttle time 
metrics. What do you think? If so, I will have a separate PR to add 
throttle-time metrics and use Token Bucket for quota calculation that David 
added as part of KIP-599 that works better with bursty workloads. I will update 
the KIP accordingly and notify on voting thread.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10381) Add broker to a cluster not rebalancing partitions

2020-08-10 Thread Yogesh BG (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17175147#comment-17175147
 ] 

Yogesh BG commented on KAFKA-10381:
---

One more observation is that when i restart the leader node for that partition, 
it picks up and issue is reolved. But we can not do restart in real scenario - 
will be having data loss during restarts

> Add broker to a cluster not rebalancing partitions
> --
>
> Key: KAFKA-10381
> URL: https://issues.apache.org/jira/browse/KAFKA-10381
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: Yogesh BG
>Priority: Major
>
> Hi
> I have 3 node cluster, topic with one partition. when a node is deleted and 
> add another node. Topic goes on unknown state and not able to write/read 
> anything, below exception is seen
>  
> {code:java}
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition C-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition B-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9112: KAFKA-10312 Fix error code returned by getPartitionMetadata

2020-08-10 Thread GitBox


hachikuji commented on a change in pull request #9112:
URL: https://github.com/apache/kafka/pull/9112#discussion_r468270027



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -229,7 +205,7 @@ class MetadataCacheTest {

errorUnavailableListeners: Boolean): Unit = {
 val topic = "topic"
 
-val cache = new MetadataCache(1)
+val cache = new MetadataCache(9)

Review comment:
   nit: maybe it would be clearer if we passed `brokerId` as an argument? 
Then we could use broker 0 in the test case, for example, so that the 
UpdateMetadata request makes more sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10223) ReplicaNotAvailableException must be retriable to handle reassignments

2020-08-10 Thread Dongjoon Hyun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17175094#comment-17175094
 ] 

Dongjoon Hyun commented on KAFKA-10223:
---

Thank you for the confirmation, [~rsivaram].

> ReplicaNotAvailableException must be retriable to handle reassignments
> --
>
> Key: KAFKA-10223
> URL: https://issues.apache.org/jira/browse/KAFKA-10223
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.6.0
>
>
> ReplicaNotAvailableException should be a retriable `InvalidMetadataException` 
> since consumers may throw this during reassignments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468111300



##
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##
@@ -82,18 +110,54 @@ object FinalizedFeatureCache extends Logging {
 " The existing cache contents are %s").format(latest, 
oldFeatureAndEpoch)
   throw new FeatureCacheUpdateException(errorMsg)
 } else {
-  val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+  val incompatibleFeatures = 
brokerFeatures.incompatibleFeatures(latest.features)
   if (!incompatibleFeatures.empty) {
 val errorMsg = ("FinalizedFeatureCache update failed since feature 
compatibility" +
   " checks failed! Supported %s has incompatibilities with the latest 
%s."
-  ).format(SupportedFeatures.get, latest)
+  ).format(brokerFeatures.supportedFeatures, latest)

Review comment:
   Good question. The existing behavior is that it shuts itself down, as 
triggered by this LOC. The reason to do it is that an incompatible broker can 
potentially do harmful things to a cluster (because max version level upgrades 
are used for breaking changes): 
https://github.com/apache/kafka/blob/89a3ba69e03acbe9635ee1039abb567bf0c6631b/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala#L154-L156.
 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-10 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r468149236



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,64 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describe all SASL/SCRAM credentials.
+ *
+ * This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @return The DescribeUserScramCredentialsResult.
+ */
+default DescribeUserScramCredentialsResult describeUserScramCredentials() {
+return describeUserScramCredentials(null, new 
DescribeUserScramCredentialsOptions());
+}
+
+/**
+ * Describe SASL/SCRAM credentials for the given users.
+ *
+ * This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @param users the users for which credentials are to be described; all 
users' credentials are described if null
+ *  or empty.  A user explicitly specified here that does not 
have a SCRAM credential will not appear
+ *  in the results.

Review comment:
   Hmm, good question.  The KIP doesn't state what do do here.  @cmccabe 
thoughts?

##
File path: 
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
##
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.Properties
+
+import kafka.network.SocketServer
+import kafka.security.authorizer.AclAuthorizer
+import org.apache.kafka.clients.admin.ScramMechanism
+import org.apache.kafka.common.acl.AclOperation
+import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, 
DescribeUserScramCredentialsRequestData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, 
AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, 
DescribeUserScramCredentialsResponse}
+import org.apache.kafka.common.resource.ResourceType
+import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder}
+import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult}
+import org.junit.Assert._
+import org.junit.{Before, Test}
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * Test AlterUserScramCredentialsRequest/Response API for the cases where 
either no credentials are altered
+ * or failure is expected due to lack of authorization, sending the request to 
a non-controller broker, or some other issue.
+ * Also tests the Alter and Describe APIs for the case where credentials are 
successfully altered/described.
+ */
+class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+properties.put(KafkaConfig.ControlledShutdownEnableProp, "false")
+properties.put(KafkaConfig.AuthorizerClassNameProp, 
classOf[AlterCredentialsTest.TestAuthorizer].getName)
+properties.put(KafkaConfig.PrincipalBuilderClassProp, 
classOf[AlterCredentialsTest.TestPrincipalBuilder].getName)
+  }
+
+  @Before
+  override def setUp(): Unit = {
+AlterCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is to 
be authorized
+super.setUp()
+  }
+
+  @Test
+  def testAlterNothing(): Unit = {
+val request = new AlterUserScramCredentialsRequest.Builder(
+  new AlterUserScramCredentialsRequestData()
+.setDeletions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion])
+.setUpsertions(new 
util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build()
+val response = sendAlterUserScramCredentialsRequest(request)
+
+val results = response.data.results
+assertEquals(0, results.size)
+  }
+
+  @Test
+  def testAlterNothingNotAuthorized(): Unit = {
+AlterCredentialsTest.principal = 

[GitHub] [kafka] vvcephei commented on pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

2020-08-10 Thread GitBox


vvcephei commented on pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#issuecomment-671622789


   Verified the last commit only changed whitespace before merging.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

2020-08-10 Thread GitBox


vvcephei merged pull request #9141:
URL: https://github.com/apache/kafka/pull/9141


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9155: MINOR: Ensure a single version of scala-library is used

2020-08-10 Thread GitBox


ijuma commented on pull request #9155:
URL: https://github.com/apache/kafka/pull/9155#issuecomment-671617848


   Merged to master and cherry-picked to 2.6, 2.5 and 2.4 branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-10 Thread GitBox


vvcephei commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r468214782



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import java.time.Duration;
+import java.util.Objects;
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
+/**
+ /**
+ * A sliding window used for aggregating events.
+ * 
+ * Sliding Windows are defined based on a record's timestamp, window size 
based on the given maximum time difference (inclusive) between
+ * records in the same window and given window grace period.

Review comment:
   Haha, my specialty!
   
   The distillation of this sentence is "Windows are defined based on a 
record's timestamp, window size, and window grace period." I think the meaning 
is pretty clear, so no need to change anything.
   
   Just to point it out, there's structural ambiguity about whether the 
sentence is saying "a record's (timestamp, window size, window grace period)" 
(I.e., three properties of the record), or whether there are three top-level 
things that define the window. The latter was intended. I think actually 
inserting "the" before "window" both times would clear it up: "Windows are 
defined based on a record's timestamp, the window size, and the window grace 
period."
   
   Another note is that because the second item in the list is so long, the 
structure of the list gets a little lost. It would be better in this case to 
use the Oxford comma to clearly delineate the boundary between the second and 
third items.
   
   So, although I think this is fine as-is, if you want me to break out the red 
pen, I'd say:
   ```
* Sliding Windows are defined based on a record's timestamp, the window 
size based on the given maximum time difference (inclusive) between
* records in the same window, and the given window grace period.
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-10 Thread GitBox


vvcephei commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r468214782



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.internals.ApiUtils;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import java.time.Duration;
+import java.util.Objects;
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
+
+/**
+ /**
+ * A sliding window used for aggregating events.
+ * 
+ * Sliding Windows are defined based on a record's timestamp, window size 
based on the given maximum time difference (inclusive) between
+ * records in the same window and given window grace period.

Review comment:
   Haha, my specialty!
   
   The distillation of this sentence is "Windows are defined based on a 
record's timestamp, window size, and window grace period." I think the meaning 
is pretty clear, so no need to change anything.
   
   Just to point it out, there's structural ambiguity about whether the 
sentence is saying "a record's (timestamp, window size, window grace period)" 
(I.e., three properties of the record), or whether there are three top-level 
things that define the window. The latter was intended. I think actually 
inserting "the" before "window" both times would clear it up: Windows are 
defined based on a record's timestamp, the window size, and the window grace 
period."
   
   Another note is that because the second item in the list is so long, the 
structure of the list gets a little lost. It would be better in this case to 
use the Oxford comma to clearly delineate the boundary between the second and 
third items.
   
   So, although I think this is fine as-is, if you want me to break out the red 
pen, I'd say:
   ```
* Sliding Windows are defined based on a record's timestamp, the window 
size based on the given maximum time difference (inclusive) between
* records in the same window, and the given window grace period.
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

2020-08-10 Thread GitBox


vvcephei commented on pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#issuecomment-671609419


   All the tests were green except for the flaky
   ```
  
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

2020-08-10 Thread GitBox


vvcephei commented on a change in pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#discussion_r468206463



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##
@@ -85,68 +160,38 @@
 groupedStreams.remove(kGrouped);
 kGrouped.ensureCopartitionWith(groupedStreams);
 
-final Collection processors = new ArrayList<>();
-boolean stateCreated = false;
-int counter = 0;
-for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) {
-final StatefulProcessorNode statefulProcessorNode = 
getStatefulProcessorNode(
-kGroupedStream.getValue(),
-initializer,
-named.suffixWithOrElseGet(
-"-cogroup-agg-" + counter++,
-builder,
-CogroupedKStreamImpl.AGGREGATE_NAME),
-stateCreated,
-storeBuilder,
-windows,
-sessionWindows,
-sessionMerger);
-stateCreated = true;
-processors.add(statefulProcessorNode);
-builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
-}
+}
+
+ KTable createTable(final Collection 
processors,
+   final NamedInternal named,
+   final Serde keySerde,
+   final Serde valueSerde,
+   final String queryableName) {
 final String mergeProcessorName = named.suffixWithOrElseGet(
-"-cogroup-merge",
-builder,
-CogroupedKStreamImpl.MERGE_NAME);
+"-cogroup-merge",
+builder,
+CogroupedKStreamImpl.MERGE_NAME);

Review comment:
   It seems like your indentation is set to 8 spaces instead of 4.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10383) KTable Join on Foreign key is opinionated

2020-08-10 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17175074#comment-17175074
 ] 

John Roesler commented on KAFKA-10383:
--

Thanks for the report, [~marcolotz].

This seems like a design oversight. It does seem desirable to plug in different 
stores as the subscription store.

I'm not sure if I'd piggy-back on the existing Materialized argument, as the 
subscription state would have a completely different shape and dynamic from the 
join result (which is what Materialized configures). Plus, you may want to (eg) 
set the subscription state to in-memory without materializing the join result. 
If we piggy-back, there would be no way to express this.

At a glance, it seems like we should have a separate argument to the join, 
which would be a new object allowing to configure the things that make sense 
for a subscription store:
 * KeyValueBytesStoreSupplier: the kind of store to use
 * {color:#00627a}withLoggingEnabled{color}({color:#0033b3}final 
{color}{color:#00}Map{color}<{color:#00}String{color}, 
{color:#00}String{color}> config) / withLoggingDisabled(): the changelog 
configs
 * withCachingEnabled() / withCachingDisabled(): the caching configs

 

This would require a KIP, of course. Are you open to contributing this feature? 
I think a lot of people would find it helpful as the feature becomes more 
popular. I'd be happy to help you with the process if you're willing.

Thanks,

-John

> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Priority: Major
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> persistent storage nor change-logs creation are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to use a similar materialization method (to the one 
> provided in the argument) when creating the intermediary Foreign Key 
> state-store. If the Materialization is in memory and without changelog, the 
> same happens in the intermediate state-sore. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] skaundinya15 commented on pull request #9143: MINOR: Fix the way total consumed is calculated for verifiable consumer

2020-08-10 Thread GitBox


skaundinya15 commented on pull request #9143:
URL: https://github.com/apache/kafka/pull/9143#issuecomment-671599091


   I re-ran the system tests for `streams_broker_compatibility_test` here: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-08-10--001.1597088498--skaundinya15--minor-fix-total-consumed-for-verifiable-consumer--d799e563e/report.html
 and got a green build. Seems like anything dependent on this method/variable 
checks out ok for this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated

2020-08-10 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10383:
-
Component/s: (was: core)
 streams

> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Priority: Major
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> persistent storage nor change-logs creation are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to use a similar materialization method (to the one 
> provided in the argument) when creating the intermediary Foreign Key 
> state-store. If the Materialization is in memory and without changelog, the 
> same happens in the intermediate state-sore. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang merged pull request #9153: MINOR: Fix state transition diagram for stream threads

2020-08-10 Thread GitBox


guozhangwang merged pull request #9153:
URL: https://github.com/apache/kafka/pull/9153


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

2020-08-10 Thread GitBox


apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r468175154



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -70,37 +80,38 @@ class ConnectionQuotasTest {
 blockedPercentMeters.put(name, KafkaMetricsGroup.newMeter(
   s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, 
Map(ListenerMetricTag -> name)))
 }
+// use system time, because ConnectionQuota causes the current thread to 
wait with timeout, which waits based on
+// system time; so using mock time will likely result in test flakiness 
due to a mixed use of mock and system time
+metrics = new Metrics(new MetricConfig(), Collections.emptyList(), 
Time.SYSTEM)

Review comment:
   This is required for all tests. For tests that are not supposed to 
trigger throttling due to connection rate quota, we want this because if the 
code incorrectly throttles to limit rate (calls wait() with timeout), the 
existing tests may start failing in a way that is hard to debug (timeout too 
early or too late, not in a place we expect). 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9155: MINOR: Ensure a single version of scala-library is used

2020-08-10 Thread GitBox


ijuma merged pull request #9155:
URL: https://github.com/apache/kafka/pull/9155


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9155: MINOR: Ensure a single version of scala-library is used

2020-08-10 Thread GitBox


ijuma commented on pull request #9155:
URL: https://github.com/apache/kafka/pull/9155#issuecomment-671562430


   2 builds passed, 1 flaky test failure for Java 8.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10384) Separate converters from generated messages

2020-08-10 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-10384:


 Summary: Separate converters from generated messages
 Key: KAFKA-10384
 URL: https://issues.apache.org/jira/browse/KAFKA-10384
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe


Separate the JSON converter classes from the message classes, so that the 
clients module can be used without Jackson on the CLASSPATH.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated

2020-08-10 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz updated KAFKA-10383:
---
Description: 
*Status Quo:*
 The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this internal state-store.

 

*Related problems:*
 * IT Tests: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that are affected by the RocksDB filesystem removal problem, an approach to 
avoid the bug is to use in-memory state-stores (rather than exception 
swallowing). Having the intermediate RocksDB storage being created disregarding 
materialization method forces any IT test to necessarily use the manual FS 
deletion with exception swallowing hack.
 * Short lived Streams: Ktables can be short lived in a way that neither 
persistent storage nor change-logs creation are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use a similar materialization method (to the one 
provided in the argument) when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the intermediate state-sore. 

  was:
*Status Quo:*
 The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this internal state-store.

 

*Related problems:*
 * IT Tests: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 


> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Priority: Major
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> 

[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated

2020-08-10 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz updated KAFKA-10383:
---
Description: 
*Status Quo:*
 The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this internal state-store.

 

*Related problems:*
 * IT Tests: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 

  was:
*Status Quo:*
 The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this internal state-store.

 

*Related problems:*
 * **IT Test: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 


> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Priority: Major
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that have the RocksDB filesystem removal problem, a solution to avoid the 
> bug is to use in-memory state-stores (rather than exception swallowing). 
> Having the RocksDB storage being forcely created makes that any IT test 
> necessarily use the manual FS deletion with exception swallow hack.
>  * Short lived Streams: Sometimes, Ktables are short lived in a way that 
> neither Persistance storage nor changelogs are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution 

[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated

2020-08-10 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz updated KAFKA-10383:
---
Description: 
*Status Quo:*
 The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this internal state-store.

 

*Related problems:*
 * **IT Test: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 

  was:
*Status Quo:*
The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this state-store.

 

*Related problems:*
 * **IT Test: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 


> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Priority: Major
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * **IT Test: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that have the RocksDB filesystem removal problem, a solution to avoid the 
> bug is to use in-memory state-stores (rather than exception swallowing). 
> Having the RocksDB storage being forcely created makes that any IT test 
> necessarily use the manual FS deletion with exception swallow hack.
>  * Short lived Streams: Sometimes, Ktables are short lived in a way that 
> neither Persistance storage nor changelogs are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to 

[jira] [Created] (KAFKA-10383) KTable Join on Foreign key is opinionated

2020-08-10 Thread Marco Lotz (Jira)
Marco Lotz created KAFKA-10383:
--

 Summary: KTable Join on Foreign key is opinionated 
 Key: KAFKA-10383
 URL: https://issues.apache.org/jira/browse/KAFKA-10383
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.4.1
Reporter: Marco Lotz


*Status Quo:*
The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this state-store.

 

*Related problems:*
 * **IT Test: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #9153: MINOR: Fix state transition diagram for stream threads

2020-08-10 Thread GitBox


cadonna commented on a change in pull request #9153:
URL: https://github.com/apache/kafka/pull/9153#discussion_r468121273



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -93,7 +95,7 @@
  *  |  +-+---+  |
  *  +< | Partitions  |  |
  *  |  | Assigned (3)| <+
- *  |  +-+---+  |
+ *  |  +-+---+  ^

Review comment:
   I had the same feeling. I will change it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468111785



##
File path: core/src/main/scala/kafka/server/BrokerFeatures.scala
##
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.Logging
+import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
+import org.apache.kafka.common.feature.Features._
+
+import scala.jdk.CollectionConverters._
+
+/**
+ * A class that encapsulates the following:
+ *
+ * 1. The latest features supported by the Broker.
+ *
+ * 2. The default minimum version levels for specific features. This map 
enables feature

Review comment:
   Sure, I'll update the PR documenting it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468111300



##
File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala
##
@@ -82,18 +110,54 @@ object FinalizedFeatureCache extends Logging {
 " The existing cache contents are %s").format(latest, 
oldFeatureAndEpoch)
   throw new FeatureCacheUpdateException(errorMsg)
 } else {
-  val incompatibleFeatures = 
SupportedFeatures.incompatibleFeatures(latest.features)
+  val incompatibleFeatures = 
brokerFeatures.incompatibleFeatures(latest.features)
   if (!incompatibleFeatures.empty) {
 val errorMsg = ("FinalizedFeatureCache update failed since feature 
compatibility" +
   " checks failed! Supported %s has incompatibilities with the latest 
%s."
-  ).format(SupportedFeatures.get, latest)
+  ).format(brokerFeatures.supportedFeatures, latest)

Review comment:
   Good question. The existing behavior is that it shuts itself down, as 
triggered by this LOC: 
https://github.com/apache/kafka/blob/89a3ba69e03acbe9635ee1039abb567bf0c6631b/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala#L154-L156.
 The reason to do it is that an incompatible broker can potentially do harmful 
things to a cluster (because max version level upgrades are used for breaking 
changes).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468109791



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val incompatibilityError = "Could not apply finalized feature update 
because" +
+  " brokers were found to have incompatible versions for the feature."
+
+if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {

Review comment:
   It's required because `defaultMinVersionLevel` does not exist for a 
feature that's not in the supported list. However, I'll change the code to make 
the check more obvious to the reader (currently it's not).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9153: MINOR: Fix state transition diagram for stream threads

2020-08-10 Thread GitBox


ableegoldman commented on a change in pull request #9153:
URL: https://github.com/apache/kafka/pull/9153#discussion_r468104424



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -93,7 +95,7 @@
  *  |  +-+---+  |
  *  +< | Partitions  |  |
  *  |  | Assigned (3)| <+
- *  |  +-+---+  |
+ *  |  +-+---+  ^
  *  ||  |
  *  ||--+

Review comment:
   It seems like this here is the problem...AFAICT this line is just to 
demonstrate that RUNNING -> RUNNING is valid, but it's tied in to the arrows 
connecting every other state that actually can't transition to Running. Can we 
change this to a self-contained loop like you added for PARTITIONS_REVOKED?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -93,7 +95,7 @@
  *  |  +-+---+  |
  *  +< | Partitions  |  |
  *  |  | Assigned (3)| <+
- *  |  +-+---+  |
+ *  |  +-+---+  ^

Review comment:
   This seems a bit subtle. What if we instead break up the loop from 
Running --> Running below?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -80,11 +80,13 @@
  *  |  +-+---+
  *  +< | Starting (1)|->+
  *  |  +-+---+  |
- *  ||  |
- *  ||  |
- *  |v  |
- *  |  +-+---+  |
- *  +< | Partitions  |  |
+ *  |   |
+ *  |   |
+ *  |+<--+  |
+ *  ||   |  |
+ *  |v   |  |
+ *  |  +-+---+   |  |
+ *  +< | Partitions  | --+  |

Review comment:
   Nice catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9659) Kafka Streams / Consumer configured for static membership fails on "fatal exception: group.instance.id gets fenced"

2020-08-10 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9659.
--
Fix Version/s: 2.6.0
   Resolution: Fixed

> Kafka Streams / Consumer configured for static membership fails on "fatal 
> exception: group.instance.id gets fenced"
> ---
>
> Key: KAFKA-9659
> URL: https://issues.apache.org/jira/browse/KAFKA-9659
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Rohan Desai
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
> Attachments: ksql-1.logs
>
>
> I'm running a KSQL query, which underneath is built into a Kafka Streams 
> application. The application has been running without issue for a few days, 
> until today, when all the streams threads exited with: 
>  
>  
> {{[ERROR] 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - 
> [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer,
>  groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] 
> Received fatal exception: group.instance.id gets fenced}}
> {{[ERROR] 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - 
> [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer,
>  groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] 
> Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}}
> {{[ERROR] 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.streams.processor.internals.StreamThread run - 
> stream-thread 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors:}}
>  \{{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker 
> rejected this static consumer since another consumer with the same 
> group.instance.id has registered with a different member.id.}}{{[INFO] 
> 2020-03-05 00:57:58,776 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  org.apache.kafka.streams.processor.internals.StreamThread setState - 
> stream-thread 
> [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2]
>  State transition from RUNNING to PENDING_SHUTDOWN}}
>  
> I've attached the KSQL and Kafka Streams logs to this ticket. Here's a 
> summary for one of the streams threads (instance id `ksql-1-2`):
>  
> Around 00:56:36 the coordinator fails over from b11 to b2:
>  
> {{[INFO] 2020-03-05 00:56:36,258 
> [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - 
> [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer,
>  groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to 
> heartbeat failed since coordinator 
> b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: 
> null) is either not started or not valid.}}
>  {{ [INFO] 2020-03-05 00:56:36,258 
> [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2]
>  org.apache.kafka.clients.consumer.internals.AbstractCoordinator 
> markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, 
> clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer,
>  groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group 
> coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 
> 2147483636 rack: null) is unavailable or invalid, will attempt rediscovery}}
>  {{ [INFO] 2020-03-05 00:56:36,270 
> [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2]
>  

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468103224



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+
+val incompatibilityError = "Could not apply finalized feature update 
because" +
+  " brokers were found to have incompatible versions for the feature."
+
+if (brokerFeatures.supportedFeatures.get(update.feature()) == null) {
+  Right(new ApiError(Errors.INVALID_REQUEST, incompatibilityError))
+} else {
+  val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+  val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)

Review comment:
   Yes, excellent point. I'll fix this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-10 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-671520491


   @chia7712 : I agree mostly with your assessment. For most delayed 
operations, the checking for the completeness of the operation and the calling 
of onComplete() don't have to be protected under the same lock.
   
   The only one that I am not quite sure is DelayedJoin. Currently, 
DelayedJoin.tryCompleteJoin() checks if all members have joined and 
DelayedJoin.onComplete() modifies the state of the group. Both operations are 
done under the same group lock. If we relax the lock, it seems that the 
condition "all members have joined" may no longer be true when we get to 
DelayedJoin.onComplete() even though that condition was true during the 
DelayedJoin.tryCompleteJoin() check. It's not clear what we should do in that 
case.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468101982



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -977,14 +1179,30 @@ class KafkaController(val config: KafkaConfig,
 
   /**
* Send the leader information for selected partitions to selected brokers 
so that they can correctly respond to
-   * metadata requests
+   * metadata requests. Particularly, when feature versioning is enabled, we 
filter out brokers with incompatible
+   * features from receiving the metadata requests. This is because we do not 
want to activate incompatible brokers,
+   * as these may have harmful consequences to the cluster.

Review comment:
   Good question. Yes, the broker will shut itself down. But still there is 
a possible race condition that needs to be handled to prevent an incompatible 
broker from causing damage to cluster. The race condition is described in the 
KIP-584 [in this 
section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-Incompatiblebrokerlifetimeracecondition).
 Please let me know your thoughts.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468097360



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This 

[GitHub] [kafka] lct45 opened a new pull request #9157: Update for KIP-450 to handle early records

2020-08-10 Thread GitBox


lct45 opened a new pull request #9157:
URL: https://github.com/apache/kafka/pull/9157


   Handles records that fall between 0 and the timeDifference that would 
normally create negative windows. This puts a new record that falls into this 
range in a window from [0, timeDifference] and creates the record's right 
windows as later records fall into it. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9136: KAFKA-10211: Add DirectoryConfigProvider

2020-08-10 Thread GitBox


dajac commented on a change in pull request #9136:
URL: https://github.com/apache/kafka/pull/9136#discussion_r468093788



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyMap;
+
+/**
+ * An implementation of {@link ConfigProvider} based on a directory of files.
+ * Property keys correspond to the names of the regular (i.e. non-directory)
+ * files in a directory given by the path parameter.
+ * Property values are taken from the file contents corresponding to each key.
+ */
+public class DirectoryConfigProvider implements ConfigProvider {
+
+private static final Logger log = 
LoggerFactory.getLogger(DirectoryConfigProvider.class);
+
+@Override
+public void configure(Map configs) { }
+
+@Override
+public void close() throws IOException { }
+
+/**
+ * Retrieves the data contained in regular files in the directory given by 
{@code path}.
+ * Non-regular files (such as directories) in the given directory are 
silently ignored.
+ * @param path the directory where data files reside.
+ * @return the configuration data.
+ */
+@Override
+public ConfigData get(String path) {
+return get(path, Files::isRegularFile);
+}
+
+/**
+ * Retrieves the data contained in the regular files named by {@code keys} 
in the directory given by {@code path}.
+ * Non-regular files (such as directories) in the given directory are 
silently ignored.
+ * @param path the directory where data files reside.
+ * @param keys the keys whose values will be retrieved.
+ * @return the configuration data.
+ */
+@Override
+public ConfigData get(String path, Set keys) {
+return get(path, pathname ->
+Files.isRegularFile(pathname)
+&& keys.contains(pathname.getFileName().toString()));
+}
+
+private static ConfigData get(String path, Predicate fileFilter) {
+Map map = emptyMap();
+if (path != null && !path.isEmpty()) {
+Path dir = new File(path).toPath();
+if (!Files.isDirectory(dir)) {
+log.warn("The path {} is not a directory", path);
+} else {
+try {
+map = Files.list(dir)
+.filter(fileFilter)
+.collect(Collectors.toMap(
+p -> p.getFileName().toString(),
+p -> {
+try {
+return read(p);
+} catch (IOException e) {
+throw new ConfigException("Could not read 
file " + p + " for property " + p.getFileName(), e);
+}

Review comment:
   nit: Could we push the `try catch` block into the `read` method? That 
would streamline the code a little bit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-10 Thread GitBox


dajac commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468090360



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
##
@@ -74,7 +77,9 @@ public RequestAndSize parseRequest(ByteBuffer buffer) {
 ", apiVersion: " + header.apiVersion() +
 ", connectionId: " + connectionId +
 ", listenerName: " + listenerName +
-", principal: " + principal, ex);
+", principal: " + principal +
+", initial principal: " + initialPrincipalName() +
+", initial client id: " + header.initialClientId(), 
ex);

Review comment:
   nit: Could we use came case like the others?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure

2020-08-10 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174986#comment-17174986
 ] 

Sophie Blee-Goldman commented on KAFKA-10357:
-

How are we going to handle restarts/upgrades/etc? The only way to distinguish 
between a "first-ever" rebalance and the rebalance following a restart is to 
persist that information, otherwise a member who gets bounced and rejoins will 
assume it's the very first rebalance.

We could augment the subscription protocol but even that wouldn't be safe for a 
non-rolling upgrade. If every member is stopped and restarted, they'll all lose 
knowledge of their past lives and everyone will assume it's the first 
rebalance. Maybe that's no so bad  and we can just warn people not to delete 
all their topics when they do a full restart. (Of course if warning people was 
sufficient then we wouldn't be having this conversation in the first place..)

> Handle accidental deletion of repartition-topics as exceptional failure
> ---
>
> Key: KAFKA-10357
> URL: https://issues.apache.org/jira/browse/KAFKA-10357
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> Repartition topics are both written by Stream's producer and read by Stream's 
> consumer, so when they are accidentally deleted both clients may be notified. 
> But in practice the consumer would react to it much quicker than producer 
> since the latter has a delivery timeout expiration period (see 
> https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to 
> it, it will re-join the group since metadata changed and during the triggered 
> rebalance it would auto-recreate the topic silently and continue, causing 
> data lost silently. 
> One idea, is to only create all repartition topics *once* in the first 
> rebalance and not auto-create them any more in future rebalances, instead it 
> would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code 
> (https://issues.apache.org/jira/browse/KAFKA-10355).
> The challenge part would be, how to determine if it is the first-ever 
> rebalance, and there are several wild ideas I'd like to throw out here:
> 1) change the thread state transition diagram so that CREATED state would not 
> transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the 
> assign function we can check if the state is still in CREATED and not RUNNING.
> 2) augment the subscriptionInfo to encode whether or not this is the first 
> time ever rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468089357



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.

Review comment:
   To be sure we are on same page, is this because of a controller failover 
during an IBP bump?
   It seems to me that this can happen mainly when IBP is being bumped from a 
value less than KAFKA_2_7_IV0 to a value greater than or equal to KAFKA_2_7_IV0 
(assuming subsequent IBP bumps will be from KAFKA_2_7_IV0 to a higher value, so 
the node status will remain enabled).
   
   In general, I'm not sure how to avoid this node status flip until IBP bump 
has been completed cluster-wide. 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on pull request #9143: MINOR: Fix the way total consumed is calculated for verifiable consumer

2020-08-10 Thread GitBox


skaundinya15 commented on pull request #9143:
URL: https://github.com/apache/kafka/pull/9143#issuecomment-671506739


   Link for system test run for this branch: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-08-07--001.1596860596--skaundinya15--minor-fix-total-consumed-for-verifiable-consumer--d799e563e/report.html.
 The one test that might be related is `streams_broker_compatibility_test` but 
it looked like it failed on something unrelated, so I am rerunning that test to 
see if it it's okay.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468085443



##
File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0+",
+  "about": "Results for each feature update.", "fields": [
+  {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,

Review comment:
   Yes, we changed to have an error code per feature update. I'll update 
the KIP-584 write up.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r468084269



##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+  "about": "The list of updates to finalized features.", "fields": [
+  {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+"about": "The name of the finalized feature to be updated."},
+  {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+"about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+  {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
   I'm missing something. Which lines on the KIP-584 were you referring to? 
I didn't find any mention of the flag being at the topic level.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-08-10 Thread GitBox


big-andy-coates commented on pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#issuecomment-671501927


   @mjsax as discussed. Please review.
   
   I think this change makes the output from the table filter semantically 
correct, i.e. we no longer output tombstones for rows that didn't exist in the 
output to begin with. However, this comes at a cost!  Now the source table is 
being materialized, (as you can see from the changes needed to get some other 
tests to pass).
   
   The cost of better semantics could be very high, and the user has no way of 
avoiding this. Where as previously the user could choose to 'fix' the bad 
semantics by manually calling `enableSendingOldValues` themselves, if they 
cared. I'm left feeling a little uneasy about _forcing_ users to pay the cost 
of materialization, even if they either don't care about the spurious 
tombstones, or their use case doesn't generate them.
   
   This leads me to the following questions:
   
   1. Wouldn't this be a breaking change for existing users of the library? If 
we stick with this solution, how would we handle this?
   1. Might it be better to only enable the sending of old values _if_ the 
source table is already materialized? This would mean the fix only pays the 
cost of an additional rocksdb read, which is still not zero, but much lower 
that forced materialization, and it would also mean this isn't a breaking 
change.
   
   Or maybe we choose to _not_ fix this. Preferring the current semantically 
incorrect, but better performing, solution with a known workaround for users 
that require correct semantics?  i.e. we could document the use of 
`enableSendingOldValues` in the `filter` method's java docs.
   
   Your thoughts my good man?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9112: KAFKA-10312 Fix error code returned by getPartitionMetadata

2020-08-10 Thread GitBox


hachikuji commented on pull request #9112:
URL: https://github.com/apache/kafka/pull/9112#issuecomment-671500806


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on pull request #9142: MINOR: Fix delete_topic for system tests

2020-08-10 Thread GitBox


skaundinya15 commented on pull request #9142:
URL: https://github.com/apache/kafka/pull/9142#issuecomment-671498810


   Link to system test run from this branch: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-08-07--001.1596866417--skaundinya15--minor-fix-delete-topic-implementation--387bd9ed9/report.html.
 The system test that uses `delete_topic` is 
   ```
   Module: kafkatest.tests.core.replica_scale_test
   Class:  ReplicaScaleTest
   Method: test_clean_bounce
   Arguments:
   {
 "partition_count": 34,
 "replication_factor": 3,
 "topic_count": 500
   }
   ```
   And from the link above this test passed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10382) MockProducer is not ThreadSafe, ideally it should be as the implementation it mocks is

2020-08-10 Thread Antony Stubbs (Jira)
Antony Stubbs created KAFKA-10382:
-

 Summary: MockProducer is not ThreadSafe, ideally it should be as 
the implementation it mocks is
 Key: KAFKA-10382
 URL: https://issues.apache.org/jira/browse/KAFKA-10382
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.6.0
Reporter: Antony Stubbs


In testing my project, I discovered that the MockProducer is not thread safe as 
I thought. It doesn't use thread safe libraries for it's underlying stores, and 
only _some_ of it’s methods are synchronised.

 

As performance isn’t an issue for this, I would propose simply synchronising 
all public methods in the class, as some already are.

 

In my project, send is synchronised and commit transactions isn’t. This was 
causing weird collection manipulation and messages going missing. My lolcat 
only solution was simply to synchronise on the MockProducer instance before 
calling commit.

 

See my workaround: 
https://github.com/astubbs/async-consumer/pull/13/files#diff-8e93aa2a2003be7436f94956cf809b2eR558

 

PR available: https://github.com/apache/kafka/pull/9154



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-10 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174955#comment-17174955
 ] 

Ning Zhang commented on KAFKA-10370:


Hi [~rhauch], when you have a chance, I would like to get your initial feedback 
/ advice on this issue and proposed solution. Thanks cc [~ryannedolan]

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> 

[GitHub] [kafka] big-andy-coates opened a new pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-08-10 Thread GitBox


big-andy-coates opened a new pull request #9156:
URL: https://github.com/apache/kafka/pull/9156


   fixes: [KAFKA-10077](https://issues.apache.org/jira/browse/KAFKA-10077).
   
   Enable sending old values on `KTable.filter` call to avoid the filter 
forwarding tombstones for rows that do not exist in the output.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10077) Filter downstream of state-store results in spurious tombstones

2020-08-10 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-10077:

Summary: Filter downstream of state-store results in spurious tombstones  
(was: Filter downstream of state-store results in suprious tombstones)

> Filter downstream of state-store results in spurious tombstones
> ---
>
> Key: KAFKA-10077
> URL: https://issues.apache.org/jira/browse/KAFKA-10077
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
>
> Adding a `filter` call downstream of anything that has a state store, e.g. a 
> table source, results in spurious tombstones being emitted from the topology 
> for any key where a new entry doesn't match the filter, _even when no 
> previous value existed for the row_.
> To put this another way: a filer downstream of a state-store will output a 
> tombstone on an INSERT the doesn't match the filter, when it should only 
> output a tombstone on an UPDATE.
>  
> This code shows the problem:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> builder
>  .table("table", Materialized.with(Serdes.Long(), Serdes.Long()))
>  .filter((k, v) -> v % 2 == 0)
>  .toStream()
>  .to("bob");
> final Topology topology = builder.build();
> final Properties props = new Properties();
> props.put("application.id", "fred");
> props.put("bootstrap.servers", "who cares");
> final TopologyTestDriver driver = new TopologyTestDriver(topology, props);
> final TestInputTopic input = driver
>  .createInputTopic("table", Serdes.Long().serializer(), 
> Serdes.Long().serializer());
> input.pipeInput(1L, 2L);
> input.pipeInput(1L, 1L);
> input.pipeInput(2L, 1L);
> final TestOutputTopic output = driver
>  .createOutputTopic("bob", Serdes.Long().deserializer(), 
> Serdes.Long().deserializer());
> final List> keyValues = output.readKeyValuesToList();
> // keyValues contains:
> // 1 -> 1
> // 1 -> null <-- correct tombstone: deletes previous row.
> // 2 -> null <-- spurious tombstone: no previous row. 
> {code}
>  
> These spurious tombstones can cause a LOT of noise when, for example, the 
> filter is looking for a specific key.  In such a situation, _every input 
> record that does not have that key results in a tombstone!_ meaning there are 
> many more tombstones than useful data.
>  I believe the fix is to turn on {{KTableImpl::enableSendingOldValues}} for 
> any filter that is downstream of a statestore 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-10 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Comment: was deleted

(was: https://github.com/apache/kafka/pull/9145)

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution (that has been initially verified) proposed in the attached 
> 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-10 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Reviewer: Randall Hauch

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution (that has been initially verified) proposed in the attached 
> PR is to use *consumer.assign* with 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-10 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Fix Version/s: (was: 2.6.0)
   2.7.0

> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.7.0
>
>
> In 
> [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java],
>  when we want the consumer to consume from certain offsets, rather than from 
> the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66]
>  provided a way to supply the offsets from external (e.g. implementation of 
> SinkTask) to rewind the consumer. 
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call 
> [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633]
>  to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not 
> empty, (2) consumer.seek(tp, offset) to rewind the consumer.
> As a part of [WorkerSinkTask 
> initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307],
>  when the [SinkTask 
> starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88],
>  we can supply the specific offsets by +"context.offset(supplied_offsets);+" 
> in start() method, so that when the consumer does the first poll, it should 
> rewind to the specific offsets in rewind() method. However in practice, we 
> saw the following IllegalStateException when running consumer.seek(tp, 
> offsets);
> {code:java}
> [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} 
> Rewind test-1 to offset 3 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
> [2020-08-07 23:53:55,752] INFO [Consumer 
> clientId=connector-consumer-MirrorSinkConnector-0, 
> groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1592)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:187)
> java.lang.IllegalStateException: No current assignment for partition test-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
> at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
> is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:188)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution (that has been initially verified) proposed in the attached 
> PR is to use 

[GitHub] [kafka] ijuma commented on pull request #9155: MINOR: Ensure a single version of scala-library is used

2020-08-10 Thread GitBox


ijuma commented on pull request #9155:
URL: https://github.com/apache/kafka/pull/9155#issuecomment-671482140


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions

2020-08-10 Thread Yogesh BG (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yogesh BG updated KAFKA-10381:
--
Affects Version/s: 2.3.0

> Add broker to a cluster not rebalancing partitions
> --
>
> Key: KAFKA-10381
> URL: https://issues.apache.org/jira/browse/KAFKA-10381
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0
>Reporter: Yogesh BG
>Priority: Major
>
> Hi
> I have 3 node cluster, topic with one partition. when a node is deleted and 
> add another node. Topic goes on unknown state and not able to write/read 
> anything, below exception is seen
>  
> {code:java}
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition C-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition B-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-08-10 Thread GitBox


junrao commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r467279790



##
File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json
##
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "request",
+  "name": "UpdateFeaturesRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+",
+  "about": "The list of updates to finalized features.", "fields": [
+  {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true,
+"about": "The name of the finalized feature to be updated."},
+  {"name": "MaxVersionLevel", "type": "int16", "versions": "0+",
+"about": "The new maximum version level for the finalized feature. A 
value >= 1 is valid. A value < 1, is special, and can be used to request the 
deletion of the finalized feature."},
+  {"name": "AllowDowngrade", "type": "bool", "versions": "0+",

Review comment:
   The KIP wiki has AllowDowngrade at the topic level. Could we update that?

##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the 

[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions

2020-08-10 Thread Yogesh BG (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yogesh BG updated KAFKA-10381:
--
Priority: Major  (was: Trivial)

> Add broker to a cluster not rebalancing partitions
> --
>
> Key: KAFKA-10381
> URL: https://issues.apache.org/jira/browse/KAFKA-10381
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yogesh BG
>Priority: Major
>
> Hi
> I have 3 node cluster, topic with one partition. when a node is deleted and 
> add another node. Topic goes on unknown state and not able to write/read 
> anything, below exception is seen
>  
> {code:java}
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition C-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition B-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10381) Add broker to a cluster not rebalancing partitions

2020-08-10 Thread Yogesh BG (Jira)
Yogesh BG created KAFKA-10381:
-

 Summary: Add broker to a cluster not rebalancing partitions
 Key: KAFKA-10381
 URL: https://issues.apache.org/jira/browse/KAFKA-10381
 Project: Kafka
  Issue Type: Bug
Reporter: Yogesh BG


Hi

I have 3 node cluster, topic with one partition. when a node is deleted and add 
another node. Topic goes on unknown state and not able to write/read anything, 
below exception is seen

 
{code:java}
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1003,1004 for partition A-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1003,1004 for partition C-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1002,1004 for partition A-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1002,1004 for partition B-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1003,1004 for partition A-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
[2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed 
to record follower 1005's position 0 since the replica is not recognized to be 
one of the assigned replicas 1003,1004 for partition A-0. Empty records will be 
returned for this partition. (kafka.server.ReplicaManager)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions

2020-08-10 Thread Yogesh BG (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yogesh BG updated KAFKA-10381:
--
Priority: Trivial  (was: Major)

> Add broker to a cluster not rebalancing partitions
> --
>
> Key: KAFKA-10381
> URL: https://issues.apache.org/jira/browse/KAFKA-10381
> Project: Kafka
>  Issue Type: Bug
>Reporter: Yogesh BG
>Priority: Trivial
>
> Hi
> I have 3 node cluster, topic with one partition. when a node is deleted and 
> add another node. Topic goes on unknown state and not able to write/read 
> anything, below exception is seen
>  
> {code:java}
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition C-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1002,1004 for partition B-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 
> failed to record follower 1005's position 0 since the replica is not 
> recognized to be one of the assigned replicas 1003,1004 for partition A-0. 
> Empty records will be returned for this partition. 
> (kafka.server.ReplicaManager)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-10 Thread GitBox


rajinisivaram commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r467848781



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * Representation of a SASL/SCRAM Mechanism.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API;>KIP-554:
 Add Broker-side SCRAM Config API
+ */
+public enum ScramMechanism {
+UNKNOWN((byte) 0),
+SCRAM_SHA_256((byte) 1),
+SCRAM_SHA_512((byte) 2);
+
+/**
+ *
+ * @param type the type indicator
+ * @return the instance corresponding to the given type indicator, 
otherwise {@link #UNKNOWN}
+ */
+public static ScramMechanism fromType(byte type) {
+for (ScramMechanism scramMechanism : ScramMechanism.values()) {
+if (scramMechanism.type == type) {
+return scramMechanism;
+}
+}
+return UNKNOWN;
+}
+
+/**
+ *
+ * @param mechanismName the SASL SCRAM mechanism name
+ * @return the corresponding SASL SCRAM mechanism enum, otherwise {@link 
#UNKNOWN}
+ * @see https://tools.ietf.org/html/rfc5802#section-4>
+ * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4
+ */
+public static ScramMechanism fromMechanismName(String mechanismName) {
+ScramMechanism retvalFoundMechanism = 
ScramMechanism.valueOf(mechanismName.replace('-', '_'));
+return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN;
+}
+
+/**
+ *
+ * @return the corresponding SASL SCRAM mechanism name
+ * @see https://tools.ietf.org/html/rfc5802#section-4>
+ * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and 
GSS-API Mechanisms, Section 4
+ */
+public String getMechanismName() {

Review comment:
   We don't use `get` prefix elsewhere, just `mechanismName()`?

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,64 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describe all SASL/SCRAM credentials.
+ *
+ * This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @return The DescribeUserScramCredentialsResult.
+ */
+default DescribeUserScramCredentialsResult describeUserScramCredentials() {
+return describeUserScramCredentials(null, new 
DescribeUserScramCredentialsOptions());
+}
+
+/**
+ * Describe SASL/SCRAM credentials for the given users.
+ *
+ * This is a convenience method for {@link 
#describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)}
+ *
+ * @param users the users for which credentials are to be described; all 
users' credentials are described if null
+ *  or empty.  A user explicitly specified here that does not 
have a SCRAM credential will not appear
+ *  in the results.

Review comment:
   Should we throw an exception for users which don't exist to be 
consistent with other APIs?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-10 Thread GitBox


abbccdda commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r468005080



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -88,15 +88,17 @@ class SocketServer(val config: KafkaConfig,
   private val memoryPoolDepletedTimeMetricName = 
metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, 
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
-  // data-plane
-  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
-  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneMetricPrefix, time)
   // control-plane
   private var controlPlaneProcessorOpt : Option[Processor] = None
   private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
   val controlPlaneRequestChannelOpt: Option[RequestChannel] = 
config.controlPlaneListenerName.map(_ =>
-new RequestChannel(20, ControlPlaneMetricPrefix, time))
+new RequestChannel(20, ControlPlaneMetricPrefix, time, true))
+  // data-plane
+  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
Acceptor]()
+  // If the control plane processor is not defined, just set the flag to true 
in data plane to bypass the check for whether a given
+  // request is from the control plane or not.

Review comment:
   What if the user doesn't configure an inter-broker listener?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on pull request #9155: MINOR: Ensure a single version of scala-library is used

2020-08-10 Thread GitBox


stanislavkozlovski commented on pull request #9155:
URL: https://github.com/apache/kafka/pull/9155#issuecomment-671429610


   In 2.4 we use `def defaultScala212Version = '2.12.10'` yet it gets updated 
to `2.12.12` due to the jackson-module-scala library using the 2.12.12 version 
of scala-library in its 2.10.5 version:
   
https://github.com/FasterXML/jackson-module-scala/blob/0cfeb8d27195a357887fa99f8915cfaa519aabc9/build.sbt#L8



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski opened a new pull request #9155: MINOR: Ensure a single version of scala-library is used

2020-08-10 Thread GitBox


stanislavkozlovski opened a new pull request #9155:
URL: https://github.com/apache/kafka/pull/9155


   This patch ensures we use a force resolution strategy for the scala-library 
dependency
   
   I've tested this locally and saw a difference in the output:
   With the change (using 2.4 and the jackson library **2.10.5**):
   ```
   ./core/build/dependant-libs-2.12.10/scala-java8-compat_2.12-0.9.0.jar
   ./core/build/dependant-libs-2.12.10/scala-collection-compat_2.12-2.1.2.jar
   ./core/build/dependant-libs-2.12.10/scala-reflect-2.12.10.jar
   ./core/build/dependant-libs-2.12.10/scala-logging_2.12-3.9.2.jar
   ./core/build/dependant-libs-2.12.10/scala-library-2.12.10.jar
   ```
   Without (using 2.4 and the jackson library **2.10.0**):
   ```
find . -name 'scala*.jar'
   ./core/build/dependant-libs-2.12.10/scala-java8-compat_2.12-0.9.0.jar
   ./core/build/dependant-libs-2.12.10/scala-collection-compat_2.12-2.1.2.jar
   ./core/build/dependant-libs-2.12.10/scala-reflect-2.12.10.jar
   ./core/build/dependant-libs-2.12.10/scala-logging_2.12-3.9.2.jar
   ./core/build/dependant-libs-2.12.10/scala-library-2.12.12.jar
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9136: KAFKA-10211: Add DirectoryConfigProvider

2020-08-10 Thread GitBox


tombentley commented on pull request #9136:
URL: https://github.com/apache/kafka/pull/9136#issuecomment-671424344


   @mimaison done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #9136: KAFKA-10211: Add DirectoryConfigProvider

2020-08-10 Thread GitBox


tombentley commented on a change in pull request #9136:
URL: https://github.com/apache/kafka/pull/9136#discussion_r467984879



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An implementation of {@link ConfigProvider} based on a directory of files.
+ * Property keys correspond to the names of the regular (i.e. non-directory)
+ * files in a directory given by the path parameter.
+ * Property values are taken from the file contents corresponding to each key.
+ */
+public class DirectoryConfigProvider implements ConfigProvider {
+
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+@Override
+public void configure(Map configs) { }
+
+@Override
+public void close() throws IOException { }
+
+/**
+ * Retrieves the data contained in regular files in the directory given by 
{@code path}.
+ * Non-regular files (such as directories) in the given directory are 
silently ignored.
+ * @param path the directory where data files reside.
+ * @return the configuration data.
+ */
+@Override
+public ConfigData get(String path) {
+return get(path, File::isFile);
+}
+
+/**
+ * Retrieves the data contained in the regular files named by {@code keys} 
in the directory given by {@code path}.
+ * Non-regular files (such as directories) in the given directory are 
silently ignored.
+ * @param path the directory where data files reside.
+ * @param keys the keys whose values will be retrieved.
+ * @return the configuration data.
+ */
+@Override
+public ConfigData get(String path, Set keys) {
+return get(path, pathname ->
+pathname.isFile()
+&& keys.contains(pathname.getName()));
+}
+
+private ConfigData get(String path, FileFilter fileFilter) {
+Map map = new HashMap<>();
+if (path != null && !path.isEmpty()) {
+File dir = new File(path);
+if (!dir.isDirectory()) {
+log.warn("The path {} is not a directory", path);
+} else {
+for (File file : dir.listFiles(fileFilter)) {

Review comment:
   That's a good point about null being used for the error case as well as 
the directory case. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-10 Thread GitBox


abbccdda commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467975757



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##
@@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) {
 }
 
 public RequestHeader(ApiKeys requestApiKey, short requestVersion, String 
clientId, int correlationId) {
-this(new RequestHeaderData().
-setRequestApiKey(requestApiKey.id).
-setRequestApiVersion(requestVersion).
-setClientId(clientId).
-setCorrelationId(correlationId),
+this(requestApiKey, requestVersion, clientId, correlationId, null, 
null);
+}
+
+public RequestHeader(ApiKeys requestApiKey,

Review comment:
   We do have a constructor which doesn't contain the two fields above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-10 Thread GitBox


abbccdda commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467975394



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##
@@ -69,4 +69,11 @@
  * Returns the correlation id from the request header.
  */
 int correlationId();
+
+/**
+ * Returns the initial principal name for a forwarded request.
+ */
+default String initialPrincipalName() {

Review comment:
   Yea, we need to have it here for audit logging. We could of course have 
the meta comment suggest "do not use for authorization"





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9073: MINOR: add task ':streams:testAll'

2020-08-10 Thread GitBox


vvcephei commented on a change in pull request #9073:
URL: https://github.com/apache/kafka/pull/9073#discussion_r467975526



##
File path: build.gradle
##
@@ -1266,6 +1266,27 @@ project(':streams') {
 if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
 standardOutput = new File(generatedDocsDir, 
"streams_config.html").newOutputStream()
   }
+
+  task testAll(
+dependsOn: [
+':streams:test',
+':streams:test-utils:test',
+':streams:streams-scala:test',
+':streams:upgrade-system-tests-0100:test',
+':streams:upgrade-system-tests-0101:test',
+':streams:upgrade-system-tests-0102:test',
+':streams:upgrade-system-tests-0110:test',
+':streams:upgrade-system-tests-10:test',
+':streams:upgrade-system-tests-11:test',
+':streams:upgrade-system-tests-20:test',
+':streams:upgrade-system-tests-21:test',
+':streams:upgrade-system-tests-22:test',
+':streams:upgrade-system-tests-23:test',
+':streams:upgrade-system-tests-24:test',
+':streams:upgrade-system-tests-25:test',

Review comment:
   Thanks for the comment anyway.
   
   I was on the fence about it, but I went with including these projects just 
to be sure that `testAll` really tests all of the components of `:streams`. 
Even though they don't have tests, the `:test` target is convenient because it 
runs all of the compilation targets, as well as spotbugs and checkstyle. The 
idea is just to flush out any/all possible sources of failure so that we can be 
pretty sure that if `:streams:testAll` passes, then so will Jenkins.
   
   I agree it's a pain. Maybe we can consider either contributing a script to 
keep this stuff updated or writing some gradle code to build the list 
automatically.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] astubbs opened a new pull request #9154: Threadsafe mock producer

2020-08-10 Thread GitBox


astubbs opened a new pull request #9154:
URL: https://github.com/apache/kafka/pull/9154


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…

2020-08-10 Thread GitBox


chia7712 commented on pull request #9102:
URL: https://github.com/apache/kafka/pull/9102#issuecomment-671337544


   @guozhangwang @abbccdda More comments? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9153: MINOR: Fix state transition diagram for stream threads

2020-08-10 Thread GitBox


cadonna commented on pull request #9153:
URL: https://github.com/apache/kafka/pull/9153#issuecomment-671328193


   Call for review: @guozhangwang @ableegoldman 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9153: MINOR: Fix state transition diagram for stream threads

2020-08-10 Thread GitBox


cadonna commented on a change in pull request #9153:
URL: https://github.com/apache/kafka/pull/9153#discussion_r467872227



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -80,11 +80,13 @@
  *  |  +-+---+
  *  +< | Starting (1)|->+
  *  |  +-+---+  |
- *  ||  |
- *  ||  |
- *  |v  |
- *  |  +-+---+  |
- *  +< | Partitions  |  |
+ *  |   |
+ *  |   |
+ *  |+<--+  |
+ *  ||   |  |
+ *  |v   |  |
+ *  |  +-+---+   |  |
+ *  +< | Partitions  | --+  |

Review comment:
   This valid loop was only mentioned in the notes but was not shown in the 
diagram.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9153: MINOR: Fix state transition diagram for stream threads

2020-08-10 Thread GitBox


cadonna commented on a change in pull request #9153:
URL: https://github.com/apache/kafka/pull/9153#discussion_r467871730



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -93,7 +95,7 @@
  *  |  +-+---+  |
  *  +< | Partitions  |  |
  *  |  | Assigned (3)| <+
- *  |  +-+---+  |
+ *  |  +-+---+  ^

Review comment:
   Before this change a transition from `STARTING` to `RUNNING` was 
possible in the diagram. However, such a transition is not valid.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #9153: MINOR: Fix state transition diagram for stream threads

2020-08-10 Thread GitBox


cadonna opened a new pull request #9153:
URL: https://github.com/apache/kafka/pull/9153


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure

2020-08-10 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174279#comment-17174279
 ] 

Bruno Cadonna commented on KAFKA-10357:
---

I have a couple of questions regarding wild idea 1):

1) I guess you mean STARTING and not CREATED, don't you? There is no transition 
from CREATED to PARTITION_REVOKED or PARTITION_ASSIGNED.

2) I suppose this check on the states of the stream threads is done in the 
group leader. If a Streams client joined an existing group and a stream thread 
of this newly added Streams client were elected as the group leader, then we 
would have the situation where the stream thread is in STARTING but it would 
not be the first-ever rebalance. Is this correct?

> Handle accidental deletion of repartition-topics as exceptional failure
> ---
>
> Key: KAFKA-10357
> URL: https://issues.apache.org/jira/browse/KAFKA-10357
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> Repartition topics are both written by Stream's producer and read by Stream's 
> consumer, so when they are accidentally deleted both clients may be notified. 
> But in practice the consumer would react to it much quicker than producer 
> since the latter has a delivery timeout expiration period (see 
> https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to 
> it, it will re-join the group since metadata changed and during the triggered 
> rebalance it would auto-recreate the topic silently and continue, causing 
> data lost silently. 
> One idea, is to only create all repartition topics *once* in the first 
> rebalance and not auto-create them any more in future rebalances, instead it 
> would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code 
> (https://issues.apache.org/jira/browse/KAFKA-10355).
> The challenge part would be, how to determine if it is the first-ever 
> rebalance, and there are several wild ideas I'd like to throw out here:
> 1) change the thread state transition diagram so that CREATED state would not 
> transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the 
> assign function we can check if the state is still in CREATED and not RUNNING.
> 2) augment the subscriptionInfo to encode whether or not this is the first 
> time ever rebalance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] viktorsomogyi edited a comment on pull request #9150: KAFKA-9839; Broker should accept control requests with newer broker epoch

2020-08-10 Thread GitBox


viktorsomogyi edited a comment on pull request #9150:
URL: https://github.com/apache/kafka/pull/9150#issuecomment-671323114


   2.3: https://github.com/apache/kafka/pull/9151
   2.4: https://github.com/apache/kafka/pull/9152



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #9150: KAFKA-9839; Broker should accept control requests with newer broker epoch

2020-08-10 Thread GitBox


viktorsomogyi commented on pull request #9150:
URL: https://github.com/apache/kafka/pull/9150#issuecomment-671323114


   2.3: https://github.com/apache/kafka/pull/9151
   2.4: https://github.com/apache/kafka/pull/9151



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi opened a new pull request #9152: KAFKA-9839; Broker should accept control requests with newer broker epoch

2020-08-10 Thread GitBox


viktorsomogyi opened a new pull request #9152:
URL: https://github.com/apache/kafka/pull/9152


   A broker throws IllegalStateException if the broker epoch in the 
LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its 
current broker epoch. However, there is no guarantee that the broker would 
receive the latest broker epoch before the controller: when the broker 
registers with ZK, there are few more instructions to process before this 
broker "knows" about its epoch, while the controller may already get notified 
and send UPDATE_METADATA request (as an example) with the new epoch. This will 
result in clients getting stale metadata from this broker.
   
   With this PR, a broker accepts 
LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is 
newer than the current epoch.
   
   Reviewers: David Jacot , Jason Gustafson 

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi opened a new pull request #9151: KAFKA-9839; Broker should accept control requests with newer broker epoch

2020-08-10 Thread GitBox


viktorsomogyi opened a new pull request #9151:
URL: https://github.com/apache/kafka/pull/9151


   A broker throws IllegalStateException if the broker epoch in the 
LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its 
current broker epoch. However, there is no guarantee that the broker would 
receive the latest broker epoch before the controller: when the broker 
registers with ZK, there are few more instructions to process before this 
broker "knows" about its epoch, while the controller may already get notified 
and send UPDATE_METADATA request (as an example) with the new epoch. This will 
result in clients getting stale metadata from this broker.
   
   With this PR, a broker accepts 
LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is 
newer than the current epoch.
   
   Reviewers: David Jacot , Jason Gustafson 

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10223) ReplicaNotAvailableException must be retriable to handle reassignments

2020-08-10 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174223#comment-17174223
 ] 

Rajini Sivaram commented on KAFKA-10223:


[~dongjoon] This is not a 2.6.0 issue, ReplicaNotAvailable has always been 
non-retriable. This turned out to be a bigger issue in non-Java consumers since 
fetch-from-follower was introduced in  2.4.0. Java consumers always handled 
this case correctly and were not affected.

> ReplicaNotAvailableException must be retriable to handle reassignments
> --
>
> Key: KAFKA-10223
> URL: https://issues.apache.org/jira/browse/KAFKA-10223
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.6.0
>
>
> ReplicaNotAvailableException should be a retriable `InvalidMetadataException` 
> since consumers may throw this during reassignments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10377) Delete Useless Code

2020-08-10 Thread Bingkun.ji (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174202#comment-17174202
 ] 

Bingkun.ji commented on KAFKA-10377:


[~dongjin]  [~tombentley]   

Thank you for your reply

> Delete Useless Code
> ---
>
> Key: KAFKA-10377
> URL: https://issues.apache.org/jira/browse/KAFKA-10377
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.6.0
>Reporter: Bingkun.ji
>Priority: Trivial
> Attachments: image-2020-08-10-00-13-28-744.png
>
>
> delete useless code for client
>  
> !image-2020-08-10-00-13-28-744.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10377) Delete Useless Code

2020-08-10 Thread Bingkun.ji (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bingkun.ji resolved KAFKA-10377.

Resolution: Not A Problem

> Delete Useless Code
> ---
>
> Key: KAFKA-10377
> URL: https://issues.apache.org/jira/browse/KAFKA-10377
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.6.0
>Reporter: Bingkun.ji
>Priority: Trivial
> Attachments: image-2020-08-10-00-13-28-744.png
>
>
> delete useless code for client
>  
> !image-2020-08-10-00-13-28-744.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10377) Delete Useless Code

2020-08-10 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174183#comment-17174183
 ] 

Dongjin Lee commented on KAFKA-10377:
-

[~tombentley] is right. Those codes are not useless.

[~Bingkun.ji] Please close this issue with 'Not a problem'.

> Delete Useless Code
> ---
>
> Key: KAFKA-10377
> URL: https://issues.apache.org/jira/browse/KAFKA-10377
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.6.0
>Reporter: Bingkun.ji
>Priority: Trivial
> Attachments: image-2020-08-10-00-13-28-744.png
>
>
> delete useless code for client
>  
> !image-2020-08-10-00-13-28-744.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories

2020-08-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-8362:


Assignee: Luke Chen

> LogCleaner gets stuck after partition move between log directories
> --
>
> Key: KAFKA-8362
> URL: https://issues.apache.org/jira/browse/KAFKA-8362
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, log cleaner
>Reporter: Julio Ng
>Assignee: Luke Chen
>Priority: Major
>
> When a partition is moved from one directory to another, their checkpoint 
> entry in cleaner-offset-checkpoint file is not removed from the source 
> directory.
> As a consequence when we read the last firstDirtyOffset, we might get a stale 
> value from the old checkpoint file.
> Basically, we need clean up the entry from the check point file in the source 
> directory when the move is completed
> The current issue is that the code in LogCleanerManager:
> {noformat}
> /**
>  * @return the position processed for all logs.
>  */
> def allCleanerCheckpoints: Map[TopicPartition, Long] = {
>   inLock(lock) {
> checkpoints.values.flatMap(checkpoint => {
>   try {
> checkpoint.read()
>   } catch {
> case e: KafkaStorageException =>
>   error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
> in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
>   Map.empty[TopicPartition, Long]
>   }
> }).toMap
>   }
> }{noformat}
> collapses the offsets when multiple entries exist for the topicPartition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-08-10 Thread GitBox


showuon commented on a change in pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#discussion_r467755721



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -430,7 +434,7 @@ private void maybeCreateTopic(String topic) {
 log.info("Created topic '{}' using creation group {}", newTopic, 
topicGroup);
 } else {
 log.warn("Request to create new topic '{}' failed", topic);
-throw new ConnectException("Task failed to create new topic " + 
topic + ". Ensure "
+throw new ConnectException("Task failed to create new topic " + 
newTopic + ". Ensure "

Review comment:
   should be `newTopic` here for error message.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #9150: KAFKA-9839; Broker should accept control requests with newer broker epoch

2020-08-10 Thread GitBox


viktorsomogyi commented on pull request #9150:
URL: https://github.com/apache/kafka/pull/9150#issuecomment-671213384


   @apovzner @hachikuji would you please review this backport?
   And while I'm at it, I can backport these onto the 2.3 and 2.4 lines as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10377) Delete Useless Code

2020-08-10 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174157#comment-17174157
 ] 

Tom Bentley commented on KAFKA-10377:
-

This is used to generate the [protocol 
documentation|https://kafka.apache.org/protocol], so it's not useless.

> Delete Useless Code
> ---
>
> Key: KAFKA-10377
> URL: https://issues.apache.org/jira/browse/KAFKA-10377
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.6.0
>Reporter: Bingkun.ji
>Priority: Trivial
> Attachments: image-2020-08-10-00-13-28-744.png
>
>
> delete useless code for client
>  
> !image-2020-08-10-00-13-28-744.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] viktorsomogyi opened a new pull request #9150: KAFKA-9839; Broker should accept control requests with newer broker epoch

2020-08-10 Thread GitBox


viktorsomogyi opened a new pull request #9150:
URL: https://github.com/apache/kafka/pull/9150


   This is a backport of #8509.
   
   A broker throws IllegalStateException if the broker epoch in the 
LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its 
current broker epoch. However, there is no guarantee that the broker would 
receive the latest broker epoch before the controller: when the broker 
registers with ZK, there are few more instructions to process before this 
broker "knows" about its epoch, while the controller may already get notified 
and send UPDATE_METADATA request (as an example) with the new epoch. This will 
result in clients getting stale metadata from this broker.
   
   With this PR, a broker accepts 
LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is 
newer than the current epoch.
   
   Reviewers: David Jacot , Jason Gustafson 

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9839) IllegalStateException on metadata update when broker learns about its new epoch after the controller

2020-08-10 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-9839:
---
Affects Version/s: 2.2.1

Detected this issue in a customer's environment, I've backported it to 2.2.1 
and it seemed to fix it for them. I'll publish my solution upstream soon.

> IllegalStateException on metadata update when broker learns about its new 
> epoch after the controller
> 
>
> Key: KAFKA-9839
> URL: https://issues.apache.org/jira/browse/KAFKA-9839
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 2.2.1, 2.3.1, 2.5.0, 2.4.1
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Critical
> Fix For: 2.5.1
>
>
> Broker throws "java.lang.IllegalStateException: Epoch XXX larger than current 
> broker epoch YYY"  on UPDATE_METADATA when the controller learns about the 
> broker epoch and sends UPDATE_METADATA before KafkaZkCLient.registerBroker 
> completes (the broker learns about its new epoch).
> Here is the scenario we observed in more detail:
> 1. ZK session expires on broker 1
> 2. Broker 1 establishes new session to ZK and creates znode
> 3. Controller learns about broker 1 and assigns epoch
> 4. Broker 1 receives UPDATE_METADATA from controller, but it does not know 
> about its new epoch yet, so we get an exception:
> ERROR [KafkaApi-3] Error when handling request: clientId=1, correlationId=0, 
> api=UPDATE_METADATA, body={
> .
> java.lang.IllegalStateException: Epoch XXX larger than current broker epoch 
> YYY at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2725) at 
> kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:320) at 
> kafka.server.KafkaApis.handle(KafkaApis.scala:139) at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
> java.lang.Thread.run(Thread.java:748)
> 5. KafkaZkCLient.registerBroker completes on broker 1: "INFO Stat of the 
> created znode at /brokers/ids/1"
> The result is the broker has a stale metadata for some time.
> Possible solutions:
> 1. Broker returns a more specific error and controller retries UPDATE_MEDATA
> 2. Broker accepts UPDATE_METADATA with larger broker epoch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id

2020-08-10 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174125#comment-17174125
 ] 

Luke Chen commented on KAFKA-10038:
---

This might need a KIP. Working on that.

> ConsumerPerformance.scala supports the setting of client.id
> ---
>
> Key: KAFKA-10038
> URL: https://issues.apache.org/jira/browse/KAFKA-10038
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Affects Versions: 2.1.1
> Environment: Trunk branch
>Reporter: tigertan
>Assignee: Luke Chen
>Priority: Minor
>  Labels: newbie, performance
> Fix For: 2.7.0
>
>
> ConsumerPerformance.scala supports the setting of "client.id", which is a 
> reasonable requirement, and the way "console consumer" and "console producer" 
> handle "client.id" can be unified. "client.id" defaults to 
> "perf-consumer-client".
> We often use client.id in quotas, if the script of 
> kafka-producer-perf-test.sh supports the setting of "client.id" , we can do 
> quota testing through scripts without writing our own consumer programs. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-08-10 Thread GitBox


showuon commented on pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#issuecomment-671191319


   @kkonstantine , could you help review this PR to improve logging? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-08-10 Thread GitBox


showuon commented on a change in pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#discussion_r467713932



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -363,6 +363,7 @@ private boolean sendRecords() {
 try {
 maybeCreateTopic(record.topic());
 final String topic = producerRecord.topic();
+log.trace("{} is going to send record to {}", 
WorkerSourceTask.this, topic);

Review comment:
   If user opened the trace log, it'll see the producer is going to send 
record before producer.send got stuck





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2020-08-10 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17174105#comment-17174105
 ] 

Luke Chen edited comment on KAFKA-10340 at 8/10/20, 6:45 AM:
-

After investigation, if the {{auto.create.topics.enable}} is disabled, the 
producer.send will get only {{TimeoutException}} and no other valuable clue for 
users. So, I will improve the logging in this area.

Proposed to add 2 logs:

1. When the topic creation is disabled or topic is existed:

log.debug({color:#008000}"The topic creation setting is disabled or the topic 
name {} is already created. " {color}+
 {color:#008000}"If the topic doesn't exist, we'll rely on the 
auto.create.topics.enable setting in broker side " {color}+
 {color:#008000}"to see if the topic can be auto created or not"{color}, topic);

2. Before the producer send the record:

log.trace({color:#008000}"{} is going to send record to {}"{color}, 
WorkerSourceTask.{color:#80}this{color}, topic);

 

So, for the request to let user know it's stuck waiting for the destination 
topic to be created is now basically cannot know from client side because the 
producer.send will block on waitOnMetadata method, which will keep trying until 
timeout. There are many possible reasons for this timeout. It's hard to tell.

 

And the dynamically {{describeConfigs}} to get the broker setting is also not 
easy because the broker name (we need broker name to describeConfig) is not 
kept in the config, and also there's no other places in Kafka to check the 
broker setting before doing something. I'd prefer to keep it as is because this 
behavior(auto create topic or not while producer.send) applied for all kafka, 
not only for connectors.

 

Thanks.


was (Author: showuon):
After investigation, will improve the logging in this area.

Proposed to add 2 logs:

1. When the topic creation is disabled or topic is existed:

log.debug({color:#008000}"The topic creation setting is disabled or the topic 
name {} is already created. " {color}+
 {color:#008000}"If the topic doesn't exist, we'll rely on the 
auto.create.topics.enable setting in broker side " {color}+
 {color:#008000}"to see if the topic can be auto created or not"{color}, topic);

2. Before the producer send the record:

log.trace({color:#008000}"{} is going to send record to {}"{color}, 
WorkerSourceTask.{color:#80}this{color}, topic);

 

So, for the request to let user know it's stuck waiting for the destination 
topic to be created is now basically cannot know from client side because the 
producer.send will block on waitOnMetadata method, which will keep trying until 
timeout. There are many possible reasons for this timeout. It's hard to tell.

 

And the dynamically {{describeConfigs}} to get the broker setting is also not 
easy because the broker name (we need broker name to describeConfig) is not 
kept in the config, and also there's no other places in Kafka to check the 
broker setting before doing something. I'd prefer to keep it as is because this 
behavior(auto create topic or not while producer.send) applied for all kafka, 
not only for connectors.

 

Thanks.

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Arjun Satish
>Assignee: Luke Chen
>Priority: Major
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: 

[GitHub] [kafka] showuon opened a new pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-08-10 Thread GitBox


showuon opened a new pull request #9149:
URL: https://github.com/apache/kafka/pull/9149


   Improve the logging in `maybeCreateTopic` to let user know if  
`!topicCreation.isTopicCreationRequired(topic)`, we won't create this topic 
because the topic creation setting is disabled or the topic name is already 
created. Also, we should let user know that if the topic doesn't exist, we'll 
rely on the `auto.create.topics.enable` setting in broker side to see if the 
topic can be auto created or not. Otherwise, if the `auto.create.topics.enable` 
is disabled, the producer.send will get only `TimeoutException` and no other 
valuable clue for users.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >