[GitHub] [kafka] ttapjinda opened a new pull request #9006: KAFKA-3370 nearest offset reset

2020-07-09 Thread GitBox


ttapjinda opened a new pull request #9006:
URL: https://github.com/apache/kafka/pull/9006


   - Introduce nearest.offset.reset option on the Consumer, which will only be 
used on OffsetOutOfRangeException. 
   On enable, the offset will be reset to the earliest if out-of-range offset 
is not higher than the earliest offset, otherwise it will be reset to the 
latest offset.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest

2020-07-09 Thread GitBox


ijuma commented on pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#issuecomment-656511435


   We need this in 2.6 for the reason you stated @ableegoldman 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-09 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r452634516



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -239,9 +239,13 @@ class GroupMetadataManager(brokerId: Int,
 }
   }
 
+  /**
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   * incremented. The caller ought to complete delayed requests for 
those returned partitions.
+   */
   def storeGroup(group: GroupMetadata,
  groupAssignment: Map[String, Array[Byte]],
- responseCallback: Errors => Unit): Unit = {
+ responseCallback: Errors => Unit): Map[TopicPartition, 
LeaderHWChange] = {

Review comment:
   It's weird to have a method that invokes a callback and returns a 
result. Do we need both? We have a number of other methods that do something 
similar. It would be good to reconsider that as it's difficult to reason about 
usage in such cases.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-09 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r452634516



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -239,9 +239,13 @@ class GroupMetadataManager(brokerId: Int,
 }
   }
 
+  /**
+   * @return Returning a map of successfully appended topic partitions and a 
flag indicting whether the HWM has been
+   * incremented. The caller ought to complete delayed requests for 
those returned partitions.
+   */
   def storeGroup(group: GroupMetadata,
  groupAssignment: Map[String, Array[Byte]],
- responseCallback: Errors => Unit): Unit = {
+ responseCallback: Errors => Unit): Map[TopicPartition, 
LeaderHWChange] = {

Review comment:
   It's weird to have a method that invokes a callback and returns a 
result. Do we need both?

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -65,13 +65,24 @@ import scala.compat.java8.OptionConverters._
 /*
  * Result metadata of a log append operation on the log
  */
-case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = 
None) {
+case class LogAppendResult(info: LogAppendInfo,
+   exception: Option[Throwable] = None,
+   leaderHWChange: LeaderHWChange = 
LeaderHWChange.None) {
   def error: Errors = exception match {
 case None => Errors.NONE
 case Some(e) => Errors.forException(e)
   }
 }
 
+/**
+ * a flag indicting whether the HWM has been changed.
+ */
+sealed trait LeaderHWChange
+object LeaderHWChange {
+  case object LeaderHWIncremented extends LeaderHWChange

Review comment:
   The usual naming convention is to only capitalize the first letter, eg 
LeaderHwChange.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452622160



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -193,6 +196,26 @@ private void closeAndRevive(final Map> taskWith
 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
 }
 task.closeDirty();
+if (task.isActive()) {
+// Pause so we won't poll any more records for this task until 
it has been re-initialized
+// Note, closeDirty already clears the partitiongroup for the 
task.
+final Set currentAssignment = 
mainConsumer().assignment();
+final Set assignedToPauseAndReset =
+Utils.intersection(HashSet::new, currentAssignment, 
task.inputPartitions());
+
+mainConsumer().pause(assignedToPauseAndReset);
+final Map committed = 
mainConsumer().committed(assignedToPauseAndReset);
+for (final Map.Entry 
committedEntry : committed.entrySet()) {
+final OffsetAndMetadata offsetAndMetadata = 
committedEntry.getValue();
+if (offsetAndMetadata != null) {
+mainConsumer().seek(committedEntry.getKey(), 
offsetAndMetadata);
+
assignedToPauseAndReset.remove(committedEntry.getKey());
+}
+}
+final Set remainder = 
resetter.apply(assignedToPauseAndReset);
+// If anything didn't have a configured policy, reset to 
beginning
+mainConsumer().seekToBeginning(remainder);

Review comment:
   I think we should fail for this case, because if user configures "none" 
they request that we fail if we loose track of the offset.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452621814



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -193,6 +196,26 @@ private void closeAndRevive(final Map> taskWith
 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
 }
 task.closeDirty();
+if (task.isActive()) {
+// Pause so we won't poll any more records for this task until 
it has been re-initialized
+// Note, closeDirty already clears the partitiongroup for the 
task.
+final Set currentAssignment = 
mainConsumer().assignment();
+final Set assignedToPauseAndReset =
+Utils.intersection(HashSet::new, currentAssignment, 
task.inputPartitions());

Review comment:
   I agree that it should never happen. Maybe it's a test setup issue? 
Should we be worries to mask a potential bug by writing the code like this? Or 
should we rather fail hard (if we assume it would indicate a bug) and make sure 
the update the tests?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452620905



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -768,9 +770,30 @@ void runOnce() {
 
 private void resetInvalidOffsets(final InvalidOffsetException e) {
 final Set partitions = e.partitions();
+final Set notReset = resetOffsets(partitions);
+if (!notReset.isEmpty()) {
+final String notResetString =
+notReset.stream()
+.map(tp -> "topic " + tp.topic() + "(partition " + 
tp.partition() + ")")
+.collect(Collectors.joining(","));

Review comment:
   Nit: As reset policy is set on a per topic basis, it's sufficient to 
list the topic names -- it does not add value if we list the partitions, 
because all assigned partitions would be affected anyway.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452620343



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -95,7 +96,7 @@
  *  |  | Assigned (3)| <+
  *  |  +-+---+  |
  *  ||  |
- *  ||  |
+ *  ||--+

Review comment:
   \cc @vvcephei 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10262) StateDirectory is not thread-safe

2020-07-09 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-10262:

Description: 
As explicitly stated in the StateDirectory javadocs,  "This class is not 
thread-safe."

Despite this, a single StateDirectory is shared among all the StreamThreads of 
a client. Some of the more "dangerous" methods are indeed synchronized, but 
others are not. For example, the innocent-sounding #directoryForTask is not 
thread-safe and is called in a number of places. We call it during task 
creation, and we call it during task closure (through StateDirectory#lock). 
It's not uncommon for one thread to be closing a task while another is creating 
it after a rebalance.

In fact, we saw exactly that happen in our test application. This ultimately 
lead to the following exception

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: task directory 
[/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be 
created at 
org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
 at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:187)
 at 
org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337)
{code}
 

The exception arises from this line in StateDirectory#directoryForTask:
{code:java}
if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) 
{code}
Presumably, if the taskDir did not exist when the two threads began this 
method, then they would both attempt to create the directory. One of them will 
get there first, leaving the other to return unsuccessfully from mkdir and 
ultimately throw the above ProcessorStateException.

I've only confirmed that this affects 2.6 so far, but the unsafe methods are 
present in earlier versions. It's possible we made the problem worse somehow 
during "The Refactor" so that it's easier to hit this race condition.

  was:
As explicitly stated in the StateDirectory javadocs,  "This class is not 
thread-safe."

Despite this, a single StateDirectory is shared among all the StreamThreads of 
a client. Some of the more "dangerous" methods are indeed synchronized, but 
others are not. For example, the innocent-sounding #directoryForTask is not 
thread-safe and is called in a number of places. We call it during task 
creation, and we call it during task closure when we check 
`directoryForTaskIsEmpty`. It's not uncommon for one thread to be closing a 
task while another is creating it after a rebalance.

In fact, we saw exactly that happen in our test application. This ultimately 
lead to the following exception

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: task directory 
[/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be 
created at 
org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
 at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:187)
 at 
org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337)
{code}
 

The exception arises from this line in StateDirectory#directoryForTask:
{code:java}
if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) 
{code}
Presumably, if the taskDir did not exist when the two threads began this 
method, then they would both attempt to create the directory. One of them will 
get there first, leaving the other to return unsuccessfully from mkdir and 
ultimately throw the above ProcessorStateException.

I've only confirmed that this affects 2.6 so far, but the unsafe methods are 
present in earlier versions. It's possible we made the problem worse somehow 
during "The Refactor" so that it's easier to hit this race condition.


> StateDirectory is not thread-safe
> -
>
> Key: KAFKA-10262
> URL: https://issues.apache.org/jira/browse/KAFKA-10262
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> As explicitly stated in the StateDirectory javadocs,  "This class is not 
> thread-safe."
> Despite this, a single StateDirectory is shared among all the StreamThreads 
> of a client. Some of the more "dangerous" methods are indeed synchronized, 
> but others are not. For example, the innocent-sounding #directoryForTask is 
> not thread-safe and is called in a number of places. We call it during task 
> creation, and we call it during task closure (throug

[GitHub] [kafka] vvcephei commented on pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims

2020-07-09 Thread GitBox


vvcephei commented on pull request #9004:
URL: https://github.com/apache/kafka/pull/9004#issuecomment-656446145


   Note, I didn't bother testing the shims directly, since they are very 
effectively tested by literally all the tests in Streams. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims

2020-07-09 Thread GitBox


vvcephei commented on a change in pull request #9004:
URL: https://github.com/apache/kafka/pull/9004#discussion_r452588767



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseShim.java
##
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class ProcessorContextReverseShim implements 
InternalProcessorContext {
+final InternalApiProcessorContext delegate;
+
+static InternalProcessorContext shim(final 
InternalApiProcessorContext delegate) {
+if (delegate instanceof ProcessorContextShim) {
+return ((ProcessorContextShim) delegate).delegate;

Review comment:
   You'll see this block in all the shims. There are times when the 
internal code would wind up converting new to old and then back to new. This 
block prevents us from jumping though multiple layers in that case.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+/**
+ * Processor context interface.
+ *
+ * @param > a bound on the types of keys that may be forwarded
+ * @param > a bound on the types of values that may be forwarded
+ */
+public interface ProcessorContext {

Review comment:
   I'll have to update the KIP. Replacing ProcessorContext instead of just 
adding the generic parameters is going to avoid the Scala compatibility issue 
we faced last time.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorShim.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See

[GitHub] [kafka] vvcephei commented on pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


vvcephei commented on pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#issuecomment-656441714


   Hi @abbccdda @ableegoldman @mjsax ,
   
   I've rebased on trunk to resolve conflicts with Sophie's parallel fix. I 
also dropped a few unnecessary side-fixes. Do you mind giving this a final pass?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10249) In-memory stores are skipped when checkpointing but not skipped when reading the checkpoint

2020-07-09 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17155039#comment-17155039
 ] 

John Roesler commented on KAFKA-10249:
--

cc [~rhauch] , I've just pushed the fix to 2.6.

> In-memory stores are skipped when checkpointing but not skipped when reading 
> the checkpoint
> ---
>
> Key: KAFKA-10249
> URL: https://issues.apache.org/jira/browse/KAFKA-10249
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> As the title suggests, offsets for in-memory stores (including the 
> suppression buffer) are not written to the checkpoint file. However, when 
> reading from the checkpoint file during task initialization, we do not check 
> StateStore#persistent. We attempt to look up the offsets for in-memory stores 
> in the checkpoint file, and obviously do not find them.
> With eos we have to conclude that the existing state is dirty and thus throw 
> a TaskCorruptedException. So pretty much any task with in-memory state will 
> always hit this exception when reinitializing from the checkpoint, forcing it 
> to clear the entire state directory and build up all of its state again from 
> scratch (both persistent and in-memory).
> This is especially unfortunate for KIP-441, as we will hit this any time a 
> task is moved from one thread to another.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10249) In-memory stores are skipped when checkpointing but not skipped when reading the checkpoint

2020-07-09 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10249:
-
Fix Version/s: 2.6.0

> In-memory stores are skipped when checkpointing but not skipped when reading 
> the checkpoint
> ---
>
> Key: KAFKA-10249
> URL: https://issues.apache.org/jira/browse/KAFKA-10249
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> As the title suggests, offsets for in-memory stores (including the 
> suppression buffer) are not written to the checkpoint file. However, when 
> reading from the checkpoint file during task initialization, we do not check 
> StateStore#persistent. We attempt to look up the offsets for in-memory stores 
> in the checkpoint file, and obviously do not find them.
> With eos we have to conclude that the existing state is dirty and thus throw 
> a TaskCorruptedException. So pretty much any task with in-memory state will 
> always hit this exception when reinitializing from the checkpoint, forcing it 
> to clear the entire state directory and build up all of its state again from 
> scratch (both persistent and in-memory).
> This is especially unfortunate for KIP-441, as we will hit this any time a 
> task is moved from one thread to another.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


vvcephei merged pull request #8996:
URL: https://github.com/apache/kafka/pull/8996


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei removed a comment on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


vvcephei removed a comment on pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#issuecomment-656435203


   Unrelated test failures:
   
   * kafka.api.TransactionsTest.testBumpTransactionalEpoch
   * 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   * 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


vvcephei commented on pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#issuecomment-656435203


   Unrelated test failures:
   
   * kafka.api.TransactionsTest.testBumpTransactionalEpoch
   * 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   * 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8885: KAFKA-8264: decrease the record size for flaky test

2020-07-09 Thread GitBox


showuon commented on pull request #8885:
URL: https://github.com/apache/kafka/pull/8885#issuecomment-656432089


   @hachikuji , could you help review this small PR to fix flaky test? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9005: KAFKA-10263: Do not assign standby for revoking stateless tasks

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #9005:
URL: https://github.com/apache/kafka/pull/9005#discussion_r452570044



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -1402,9 +1404,13 @@ public void 
shouldTriggerImmediateRebalanceOnTasksRevoked() {
 final Map assignment = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
 // Verify at least one partition was revoked
-assertThat(assignment.get(CONSUMER_1).partitions(), not(allTasks));
+assertThat(assignment.get(CONSUMER_1).partitions(), 
not(allPartitions));

Review comment:
   Whoops





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#issuecomment-656424125


   Three flaky test failures:
   `MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1`
   `TransactionsTest.testBumpTransactionalEpoch`
   `EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test

2020-07-09 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17155024#comment-17155024
 ] 

Sophie Blee-Goldman commented on KAFKA-10255:
-

Failed again 
h3. Stacktrace

java.lang.AssertionError: consumer record size is not zero expected:<0> but 
was:<3> at org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.failNotEquals(Assert.java:835) at 
org.junit.Assert.assertEquals(Assert.java:647) at 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349)

> Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
> 
>
> Key: KAFKA-10255
> URL: https://issues.apache.org/jira/browse/KAFKA-10255
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 STARTED
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 FAILED
>  java.lang.AssertionError: consumer record size is not zero expected:<0> but 
> was:<2>
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.failNotEquals(Assert.java:835)
>  at org.junit.Assert.assertEquals(Assert.java:647)
>  at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10264) Flaky Test TransactionsTest.testBumpTransactionalEpoch

2020-07-09 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10264:
---

 Summary: Flaky Test TransactionsTest.testBumpTransactionalEpoch
 Key: KAFKA-10264
 URL: https://issues.apache.org/jira/browse/KAFKA-10264
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Sophie Blee-Goldman


h3. Stacktrace

java.lang.AssertionError: Unexpected exception cause 
org.apache.kafka.common.KafkaException: The client hasn't received 
acknowledgment for some previously sent messages and can no longer retry them. 
It is safe to abort the transaction and continue. at 
org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.assertTrue(Assert.java:42) at 
org.apache.kafka.test.TestUtils.assertFutureThrows(TestUtils.java:557) at 
kafka.api.TransactionsTest.testBumpTransactionalEpoch(TransactionsTest.scala:637)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #9005: KAFKA-10263: Do not assign standby for revoking stateless tasks

2020-07-09 Thread GitBox


guozhangwang commented on a change in pull request #9005:
URL: https://github.com/apache/kafka/pull/9005#discussion_r452562468



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -1402,9 +1404,13 @@ public void 
shouldTriggerImmediateRebalanceOnTasksRevoked() {
 final Map assignment = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
 // Verify at least one partition was revoked
-assertThat(assignment.get(CONSUMER_1).partitions(), not(allTasks));
+assertThat(assignment.get(CONSUMER_1).partitions(), 
not(allPartitions));
 assertThat(assignment.get(CONSUMER_2).partitions(), 
equalTo(emptyList()));
 
+// Verify that stateless revoked tasks would not be assigned as 
standbys
+
assertThat(AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()).activeTasks(),
 equalTo(emptyList()));
+
assertThat(AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()).standbyTasks(),
 equalTo(emptyMap()));

Review comment:
   Verified that without the fix, this line will fail.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -1402,9 +1404,13 @@ public void 
shouldTriggerImmediateRebalanceOnTasksRevoked() {
 final Map assignment = 
partitionAssignor.assign(metadata, new 
GroupSubscription(subscriptions)).groupAssignment();
 
 // Verify at least one partition was revoked
-assertThat(assignment.get(CONSUMER_1).partitions(), not(allTasks));
+assertThat(assignment.get(CONSUMER_1).partitions(), 
not(allPartitions));

Review comment:
   I found it was always true because we are comparing a list of tasks with 
a list of partitions ;)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #9005: KAFKA-10263: Do not assign standby for revoking stateless tasks

2020-07-09 Thread GitBox


guozhangwang opened a new pull request #9005:
URL: https://github.com/apache/kafka/pull/9005


   Also piggy-back a small fix to use TreeMap other than HashMap to preserve 
iteration ordering.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10263) Do not create standbys for those revoking active tasks if it is not stateful

2020-07-09 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10263:
-

 Summary: Do not create standbys for those revoking active tasks if 
it is not stateful
 Key: KAFKA-10263
 URL: https://issues.apache.org/jira/browse/KAFKA-10263
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang
Assignee: Guozhang Wang


Today in StreamsPartitionAssignor, if an intended active tasks is not yet 
revoked from the old owner we would not give it to the newly assigned owner, 
but instead we would assign it as a standby task to the new owner to let it 
start restoring a bit early.

However, if that task is not stateful, then there's no point trying to let it 
restore at all. This should be avoided in the assignor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement KIP-584 write path

2020-07-09 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r452398464



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##
@@ -0,0 +1,26 @@
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends 
AbstractOptions {
+private boolean shouldUseControllerAsDestination = false;
+
+public DescribeFeaturesOptions shouldUseControllerAsDestination(boolean 
shouldUse) {
+shouldUseControllerAsDestination = shouldUse;
+return this;
+}
+
+public boolean shouldUseControllerAsDestination() {

Review comment:
   remove word 'should'

##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,10 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+UpdateFinalizedFeaturesResult updateFinalizedFeatures(Set 
featureUpdates, UpdateFinalizedFeaturesOptions options);

Review comment:
   add doc

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
##
@@ -0,0 +1,26 @@
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class DescribeFeaturesOptions extends 
AbstractOptions {
+private boolean shouldUseControllerAsDestination = false;
+
+public DescribeFeaturesOptions shouldUseControllerAsDestination(boolean 
shouldUse) {

Review comment:
   add doc

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3979,6 +3986,98 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions 
options) {

Review comment:
   1. add test code in `KafkaAdminClientTest`
   2. final variable names

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3979,6 +3986,98 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions 
options) {
+final KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call callViaLeastLoadedNode = new Call("describeFeatures", 
calcDeadlineMs(now, options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+
+@Override
+ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+return new ApiVersionsRequest.Builder();
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+future.complete(
+new FeatureMetadata(
+apiVersionsResponse.finalizedFeatures(),
+apiVersionsResponse.finalizedFeaturesEpoch(),
+apiVersionsResponse.supportedFeatures()));
+} else {
+future.completeExceptionally(
+
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+completeAllExceptionally(Collections.singletonList(future), 
throwable);
+}
+};
+
+Call call = callViaLeastLoadedNode;
+if (options.shouldUseControllerAsDestination()) {
+call = new Call("describeFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+new ControllerNodeProvider()) {
+
+@Override
+ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+return (ApiVersionsRequest.Builder) 
callViaLeastLoadedNode.createRequest(timeoutMs);
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+callViaLeastLoadedNode.handleResponse(response);
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+callViaLeastLoadedNode.handleFailure(throwable);
+}
+};
+}
+runnable.call(call, now);
+return new DescribeFeaturesResult(future);
+}
+
+ 

[GitHub] [kafka] abbccdda commented on a change in pull request #9002: MINOR: Add ApiMessageTypeGenerator

2020-07-09 Thread GitBox


abbccdda commented on a change in pull request #9002:
URL: https://github.com/apache/kafka/pull/9002#discussion_r452542243



##
File path: 
generator/src/main/java/org/apache/kafka/message/TypeClassGenerator.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.message;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+
+public interface TypeClassGenerator {
+/**
+ * The short name of the type class file we are generating. For example,
+ * ApiMessageType.java.
+ */
+String outputName();
+
+/**
+ * Registers a message spec with the generator.
+ *
+ * @param spec  The spec to register.
+ */
+void registerMessageType(MessageSpec spec);
+
+/**
+ * Write out the internal state.

Review comment:
   Generate the type and write it out to the internal state.

##
File path: 
generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
##
@@ -267,10 +283,10 @@ public static void main(String[] args) throws Exception {
 if (args.length == 0) {
 System.out.println(USAGE);
 System.exit(0);
-} else if (args.length != 3) {
+} else if (args.length != 4) {

Review comment:
   Why we need separate exit code for mis-used arguments?

##
File path: 
generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
##
@@ -169,20 +185,20 @@ public static void processDirectories(String packageName, 
String outputDir, Stri
 generator.write(writer);
 }
 numProcessed++;
-messageTypeGenerator.registerMessageType(spec);
+if (typeClassGenerator != null) {
+typeClassGenerator.registerMessageType(spec);
+}
 } catch (Exception e) {
 throw new RuntimeException("Exception while processing " + 
inputPath.toString(), e);
 }
 }
 }
-if (messageTypeGenerator.hasRegisteredTypes()) {
-Path factoryOutputPath = Paths.get(outputDir, 
API_MESSAGE_TYPE_JAVA);
-outputFileNames.add(API_MESSAGE_TYPE_JAVA);
+if (typeClassGenerator != null) {

Review comment:
   Instead of using null ptr, maybe we could get a `NoOpTypeClassGenerator` 
which will do nothing for `generateAndWrite`, and remove the `outputName()` to 
make one single API. Something like
   ```
   String outputFileName = typeClassGenerator.generateAndWrite(outputDir, 
writer);
   ```
   I don't feel strong about this change, ideally it should look better, but up 
to you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10262) StateDirectory is not thread-safe

2020-07-09 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10262:
---

 Summary: StateDirectory is not thread-safe
 Key: KAFKA-10262
 URL: https://issues.apache.org/jira/browse/KAFKA-10262
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: Sophie Blee-Goldman


As explicitly stated in the StateDirectory javadocs,  "This class is not 
thread-safe."

Despite this, a single StateDirectory is shared among all the StreamThreads of 
a client. Some of the more "dangerous" methods are indeed synchronized, but 
others are not. For example, the innocent-sounding #directoryForTask is not 
thread-safe and is called in a number of places. We call it during task 
creation, and we call it during task closure when we check 
`directoryForTaskIsEmpty`. It's not uncommon for one thread to be closing a 
task while another is creating it after a rebalance.

In fact, we saw exactly that happen in our test application. This ultimately 
lead to the following exception

 
{code:java}
org.apache.kafka.streams.errors.ProcessorStateException: task directory 
[/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be 
created at 
org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
 at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:187)
 at 
org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337)
{code}
 

The exception arises from this line in StateDirectory#directoryForTask:
{code:java}
if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) 
{code}
Presumably, if the taskDir did not exist when the two threads began this 
method, then they would both attempt to create the directory. One of them will 
get there first, leaving the other to return unsuccessfully from mkdir and 
ultimately throw the above ProcessorStateException.

I've only confirmed that this affects 2.6 so far, but the unsafe methods are 
present in earlier versions. It's possible we made the problem worse somehow 
during "The Refactor" so that it's easier to hit this race condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei opened a new pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims

2020-07-09 Thread GitBox


vvcephei opened a new pull request #9004:
URL: https://github.com/apache/kafka/pull/9004


   Adds the new Processor and ProcessorContext interfaces
   as proposed in KIP-478. To integrate in a staged fashion
   with the code base, shims are included to convert back
   and forth between the new and old APIs.
   
   ProcessorNode is converted to the new APIs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10261) Introduce the KIP-478 processors with shims

2020-07-09 Thread John Roesler (Jira)
John Roesler created KAFKA-10261:


 Summary: Introduce the KIP-478 processors with shims
 Key: KAFKA-10261
 URL: https://issues.apache.org/jira/browse/KAFKA-10261
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: John Roesler
Assignee: John Roesler






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-09 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-656382338


   > Do you want to rebase again so that I can run system tests one more time?
   
   sure!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

2020-07-09 Thread GitBox


apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r452507990



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1308,18 +1410,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   val value = maxConnections(configs)
   if (value <= 0)
 throw new ConfigException("Invalid max.connections $listenerMax")

Review comment:
   yes, good catch. Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-09 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-656365594


   @chia7712 : All 3 PRs you fixed above have been merged. Do you want to 
rebase again so that I can run system tests one more time?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

2020-07-09 Thread GitBox


apovzner commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r452502811



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
 
maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window 
size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, 
timeMs: Long): Long = {
+val listenerThrottleTimeMs = maxConnectionsPerListener
+  .get(listenerName)
+  .map(listenerQuota => 
recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+  .getOrElse(0)
+
+if (protectedListener(listenerName)) {
+  listenerThrottleTimeMs
+} else {
+  val brokerThrottleTimeMs = 
recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+  math.max(brokerThrottleTimeMs, listenerThrottleTimeMs)
+}
+  }
+
+  private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = {
+try {
+  sensor.record(1.0, timeMs)
+  0
+} catch {
+  case e: QuotaViolationException =>
+val throttleTimeMs = QuotaUtils.boundedThrottleTime(
+  e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), 
maxThrottleTimeMs).toInt
+debug(s"Quota violated for sensor (${sensor.name}). Delay time: 
$throttleTimeMs ms")
+throttleTimeMs
+}
+  }
+
+  /**
+   * Creates sensor for tracking the connection creation rate and 
corresponding connection rate quota for a given
+   * listener or broker-wide, if listener is not provided.
+   * @param quotaLimit connection creation rate quota
+   * @param listenerOpt listener name if sensor is for a listener
+   */
+  private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: 
Option[String] = None): Sensor = {
+val quotaEntity = listenerOpt.getOrElse("broker")
+val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", 
rateQuotaMetricConfig(quotaLimit))
+sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false)
+info(s"Created ConnectionCreationRate-$quotaEntity sensor, 
quotaLimit=$quotaLimit")
+sensor
+  }
+
+  private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: 
Option[String] = None): Unit = {
+val metric = metrics.metric(connectionRateMetricName((listenerOpt)))
+metric.config(rateQuotaMetricConfig(quotaLimit))
+info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation 
rate to $quotaLimit")
+  }
+
+  private def connectionRateMetricName(listenerOpt: Option[String]): 
MetricName = {
+val quotaEntity = listenerOpt.getOrElse("broker")
+metrics.metricName(
+  s"connection-creation-rate-$quotaEntity",
+  "connection-quota-no-jmx",
+  s"Tracking $quotaEntity connection creation rate",
+  rateQuotaMetricTags(listenerOpt))
+  }
+
+  private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = {
+new MetricConfig()
+  .timeWindow(config.quotaWindowSizeSeconds.toLong, TimeUnit.SECONDS)
+  .samples(config.numQuotaSamples)
+  .quota(new Quota(quotaLimit, true))
+  }
+
+  private def rateQuotaMetricTags(listenerOpt: Option[String]): 
util.Map[String, String] = {
+val tags = new util.LinkedHashMap[String, String]
+listenerOpt.foreach(listener => tags.put("listener", listener))
+tags

Review comment:
   I realized that I don't need tags here anymore, since the name of the 
metric contains the name of the listener, so the metrics are already distinct 
per listener (and broker-wide). I removed that method altogether.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop

2020-07-09 Thread Konstantin Lalafaryan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Lalafaryan updated KAFKA-10253:
--
Description: 
Hello everyone!

 

We are running kafka-connect cluster  (3 workers) and very often it gets into 
an infinite rebalance loop.

 
{code:java}
2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Was selected to perform assignments, but do not have latest 
config found in sync request. Returning an empty configuration to trigger 
re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Successfully joined group with generation 305655831 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Joined group at generation 305655831 with protocol version 2 and 
got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Was selected to perform assignments, but do not have latest 
config found in sync request. Returning an empty configuration to trigger 
re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Successfully joined group with generation 305655832 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Joined group at generation 305655832 with protocol version 2 and 
got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Was selected to perform assignments, but do not have latest 
config found in sync request. Returning an empty configuration to trigger 
re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Successfully joined group with generation 305655833 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Joined group at generation 305655833 with protocol version 2 and 
got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 

[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop

2020-07-09 Thread Konstantin Lalafaryan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Lalafaryan updated KAFKA-10253:
--
Description: 
Hello everyone!

 

We are running kafka-connect cluster  (3 workers) and very often it gets into 
an infinite rebalance loop.

 
{code:java}
2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Was selected to perform assignments, but do not have latest 
config found in sync request. Returning an empty configuration to trigger 
re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Successfully joined group with generation 305655831 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Joined group at generation 305655831 with protocol version 2 and 
got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Was selected to perform assignments, but do not have latest 
config found in sync request. Returning an empty configuration to trigger 
re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Successfully joined group with generation 305655832 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Joined group at generation 305655832 with protocol version 2 and 
got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Was selected to perform assignments, but do not have latest 
config found in sync request. Returning an empty configuration to trigger 
re-sync. 
(org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Successfully joined group with generation 305655833 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Joined group at generation 305655833 with protocol version 2 and 
got assignment: Assignment{error=1, 
leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
[DistributedHerder-connect-1-1]
2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
kafka-connect] Rebalance started 
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
[DistributedHerder-connect-1-1]
2020-07-09 

[GitHub] [kafka] mjsax commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452494073



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -104,26 +104,27 @@ static void closeStateManager(final Logger log,
 if (stateDirectory.lock(id)) {
 try {
 stateMgr.close();
-
-if (wipeStateStore) {
-log.debug("Wiping state stores for {} task {}", 
taskType, id);
-// we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
-// and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
-// need to re-bootstrap the restoration from the 
beginning
-Utils.delete(stateMgr.baseDir());
-}
 } catch (final ProcessorStateException e) {
 firstException.compareAndSet(null, e);
 } finally {
-stateDirectory.unlock(id);
+try {
+if (wipeStateStore) {
+log.debug("Wiping state stores for {} task {}", 
taskType, id);
+// we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
+// and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
+// need to re-bootstrap the restoration from the 
beginning
+Utils.delete(stateMgr.baseDir());
+}
+} finally {
+stateDirectory.unlock(id);
+}

Review comment:
   IMHO, the code is good as-is.
   
   Thanks for rewriting to a nested `try-final` structure!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452491173



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -104,26 +104,27 @@ static void closeStateManager(final Logger log,
 if (stateDirectory.lock(id)) {
 try {
 stateMgr.close();
-
-if (wipeStateStore) {
-log.debug("Wiping state stores for {} task {}", 
taskType, id);
-// we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
-// and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
-// need to re-bootstrap the restoration from the 
beginning
-Utils.delete(stateMgr.baseDir());
-}
 } catch (final ProcessorStateException e) {
 firstException.compareAndSet(null, e);
 } finally {
-stateDirectory.unlock(id);
+try {
+if (wipeStateStore) {
+log.debug("Wiping state stores for {} task {}", 
taskType, id);
+// we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
+// and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
+// need to re-bootstrap the restoration from the 
beginning
+Utils.delete(stateMgr.baseDir());
+}
+} finally {
+stateDirectory.unlock(id);
+}

Review comment:
   I can't use `ExceptionUtils#executeAll` because the compiler complains 
that we don't handle the `IOException` unless we surround each Runnable with 
its own try-catch block, at which point `#executeAll` isn't really doing 
anything





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


vvcephei commented on pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#issuecomment-656349040


   It looks like the tests never ran before.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


vvcephei commented on pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#issuecomment-656348955


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


vvcephei commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452487075



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -222,8 +222,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.trace("Loaded offsets from the checkpoint file: {}", 
loadedCheckpoints);
 
 for (final StateStoreMetadata store : stores.values()) {
+if (store.corrupted) {
+log.error("Tried to initialize store offsets for corrupted 
store {}", store);
+throw new IllegalStateException("Should not initialize 
offsets for a corrupted task");
+}
+
 if (store.changelogPartition == null) {
 log.info("State store {} is not logged and hence would not 
be restored", store.stateStore.name());
+} else if (!store.stateStore.persistent()) {

Review comment:
   Fair enough.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] JimGalasyn commented on a change in pull request #8995: Restore stream-table duality description

2020-07-09 Thread GitBox


JimGalasyn commented on a change in pull request #8995:
URL: https://github.com/apache/kafka/pull/8995#discussion_r452486723



##
File path: docs/streams/core-concepts.html
##
@@ -170,13 +150,59 @@ Duality of
   or to run interactive
 queries
   against your application's latest processing results. And, beyond its 
internal usage, the Kafka Streams API
   also allows developers to exploit this duality in their own applications.
-  
+
 
-  
+
   Before we discuss concepts such as aggregations
   in Kafka Streams, we must first introduce tables in 
more detail, and talk about the aforementioned stream-table duality.
-  Essentially, this duality means that a stream can be viewed as a table, 
and a table can be viewed as a stream.
-  
+  Essentially, this duality means that a stream can be viewed as a table, 
and a table can be viewed as a stream. Kafka's log compaction feature, for 
example, exploits this duality.
+
+
+
+A simple form of a table is a collection of key-value pairs, also 
called a map or associative array. Such a table may look as follows:
+
+
+
+The stream-table duality describes the close relationship between 
streams and tables.
+
+Stream as Table: A stream can be considered a changelog of 
a table, where each data record in the stream captures a state change of the 
table. A stream is thus a table in disguise, and it can be easily turned into a 
"real" table by replaying the changelog from beginning to end to reconstruct 
the table. Similarly, in a more general analogy, aggregating data records in a 
stream - such as computing the total number of pageviews by user from a stream 
of pageview events - will return a table (here with the key and the value being 
the user and its corresponding pageview count, respectively).
+Table as Stream: A table can be considered a snapshot, at a 
point in time, of the latest value for each key in a stream (a stream's data 
records are key-value pairs). A table is thus a stream in disguise, and it can 
be easily turned into a "real" stream by iterating over each key-value entry in 
the table.
+
+
+
+Let's illustrate this with an example. Imagine a table that tracks the 
total number of pageviews by user (first column of diagram below). Over time, 
whenever a new pageview event is processed, the state of the table is updated 
accordingly. Here, the state changes between different points in time - and 
different revisions of the table - can be represented as a changelog stream 
(second column).
+
+
+
+
+Interestingly, because of the stream-table duality, the same stream 
can be used to reconstruct the original table (third column):
+
+
+
+
+The same mechanism is used, for example, to replicate databases via 
change data capture (CDC) and, within Kafka Streams, to replicate its so-called 
state stores across machines for fault-tolerance.
+The stream-table duality is such an important concept that Kafka 
Streams models it explicitly via the KStream, 
KTable, and GlobalKTable interfaces.
+
+
+Aggregations
+
+An aggregation operation takes one input stream or 
table, and yields a new table by combining multiple input records into a single 
output record. Examples of aggregations are computing counts or sum.
+
+
+
+In the Kafka Streams DSL, an input stream of an 
aggregation can be a KStream or a KTable, but the output stream 
will always be a KTable. This allows Kafka Streams to update an aggregate value 
upon the out-of-order arrival of further records after the value was produced 
and emitted. When such out-of-order arrival happens, the aggregating KStream or 
KTable emits a new aggregate value. Because the output is a KTable, the new 
value is considered to overwrite the old value with the same key in subsequent 
processing steps.

Review comment:
   kk cool, I'll open a ticket.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-09 Thread GitBox


vvcephei commented on pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#issuecomment-656347070


   The new test passed: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-09--001.1594322852--vvcephei--kafka-10173-upgrade-smoke-system-test-2-5--91174c3b2/report.html
   
   The all-streams tests failed because of the Jenkins restart. Kicked it off 
again: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4021/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452483525



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -222,8 +222,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.trace("Loaded offsets from the checkpoint file: {}", 
loadedCheckpoints);
 
 for (final StateStoreMetadata store : stores.values()) {
+if (store.corrupted) {
+log.error("Tried to initialize store offsets for corrupted 
store {}", store);
+throw new IllegalStateException("Should not initialize 
offsets for a corrupted task");
+}
+
 if (store.changelogPartition == null) {
 log.info("State store {} is not logged and hence would not 
be restored", store.stateStore.name());
+} else if (!store.stateStore.persistent()) {

Review comment:
   I'm not sure I'd call it a spurious warning -- if we don't expect to 
have checkpointed in-memory stores, and we happen to have an offset for one in 
the checkpoint file, it seems reasonable to log a warning





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #8995: Restore stream-table duality description

2020-07-09 Thread GitBox


mjsax merged pull request #8995:
URL: https://github.com/apache/kafka/pull/8995


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8995: Restore stream-table duality description

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8995:
URL: https://github.com/apache/kafka/pull/8995#discussion_r452481312



##
File path: docs/streams/core-concepts.html
##
@@ -170,13 +150,59 @@ Duality of
   or to run interactive
 queries
   against your application's latest processing results. And, beyond its 
internal usage, the Kafka Streams API
   also allows developers to exploit this duality in their own applications.
-  
+
 
-  
+
   Before we discuss concepts such as aggregations
   in Kafka Streams, we must first introduce tables in 
more detail, and talk about the aforementioned stream-table duality.
-  Essentially, this duality means that a stream can be viewed as a table, 
and a table can be viewed as a stream.
-  
+  Essentially, this duality means that a stream can be viewed as a table, 
and a table can be viewed as a stream. Kafka's log compaction feature, for 
example, exploits this duality.
+
+
+
+A simple form of a table is a collection of key-value pairs, also 
called a map or associative array. Such a table may look as follows:
+
+
+
+The stream-table duality describes the close relationship between 
streams and tables.
+
+Stream as Table: A stream can be considered a changelog of 
a table, where each data record in the stream captures a state change of the 
table. A stream is thus a table in disguise, and it can be easily turned into a 
"real" table by replaying the changelog from beginning to end to reconstruct 
the table. Similarly, in a more general analogy, aggregating data records in a 
stream - such as computing the total number of pageviews by user from a stream 
of pageview events - will return a table (here with the key and the value being 
the user and its corresponding pageview count, respectively).
+Table as Stream: A table can be considered a snapshot, at a 
point in time, of the latest value for each key in a stream (a stream's data 
records are key-value pairs). A table is thus a stream in disguise, and it can 
be easily turned into a "real" stream by iterating over each key-value entry in 
the table.
+
+
+
+Let's illustrate this with an example. Imagine a table that tracks the 
total number of pageviews by user (first column of diagram below). Over time, 
whenever a new pageview event is processed, the state of the table is updated 
accordingly. Here, the state changes between different points in time - and 
different revisions of the table - can be represented as a changelog stream 
(second column).
+
+
+
+
+Interestingly, because of the stream-table duality, the same stream 
can be used to reconstruct the original table (third column):
+
+
+
+
+The same mechanism is used, for example, to replicate databases via 
change data capture (CDC) and, within Kafka Streams, to replicate its so-called 
state stores across machines for fault-tolerance.
+The stream-table duality is such an important concept that Kafka 
Streams models it explicitly via the KStream, 
KTable, and GlobalKTable interfaces.
+
+
+Aggregations
+
+An aggregation operation takes one input stream or 
table, and yields a new table by combining multiple input records into a single 
output record. Examples of aggregations are computing counts or sum.
+
+
+
+In the Kafka Streams DSL, an input stream of an 
aggregation can be a KStream or a KTable, but the output stream 
will always be a KTable. This allows Kafka Streams to update an aggregate value 
upon the out-of-order arrival of further records after the value was produced 
and emitted. When such out-of-order arrival happens, the aggregating KStream or 
KTable emits a new aggregate value. Because the output is a KTable, the new 
value is considered to overwrite the old value with the same key in subsequent 
processing steps.

Review comment:
   Unfortunately, and I try to get it into better shape incrementally 
(reading GitHub diffs with long lines is just a pain). Would go awesome if 
somebody (*cough*) could do a PR just fixing it throughout the docs -- the 
current lazy approach is somewhat tiring.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-09 Thread GitBox


vvcephei commented on pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#issuecomment-656341862


   Just a note that this is the backport of 
https://github.com/apache/kafka/pull/8938



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452478882



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -104,26 +104,27 @@ static void closeStateManager(final Logger log,
 if (stateDirectory.lock(id)) {
 try {
 stateMgr.close();
-
-if (wipeStateStore) {
-log.debug("Wiping state stores for {} task {}", 
taskType, id);
-// we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
-// and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
-// need to re-bootstrap the restoration from the 
beginning
-Utils.delete(stateMgr.baseDir());
-}
 } catch (final ProcessorStateException e) {
 firstException.compareAndSet(null, e);
 } finally {
-stateDirectory.unlock(id);
+try {
+if (wipeStateStore) {
+log.debug("Wiping state stores for {} task {}", 
taskType, id);
+// we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
+// and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
+// need to re-bootstrap the restoration from the 
beginning
+Utils.delete(stateMgr.baseDir());
+}
+} finally {
+stateDirectory.unlock(id);
+}

Review comment:
   Well, I figured it didn't matter since these both just throw IOException 
which we catch in the outer block. The point was to make sure we unlock it. But 
I'll check out `ExceptionUtils#executeAll`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10226) KStream without SASL information should return error in confluent cloud

2020-07-09 Thread Apurva Mehta (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154913#comment-17154913
 ] 

Apurva Mehta commented on KAFKA-10226:
--

It would be good to add a `producer.send(...); producer.flush(); 
producer.close()` to that test case and see if it throws an exception.

> KStream without SASL information should return error in confluent cloud
> ---
>
> Key: KAFKA-10226
> URL: https://issues.apache.org/jira/browse/KAFKA-10226
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, streams
>Affects Versions: 2.5.0
>Reporter: Werner Daehn
>Priority: Minor
>
> I have create a KStream against the Confluent cloud and wondered why no data 
> has been received from the source. Reason was that I forgot to add the SASL 
> api keys and secrets.
>  
> For end users this might lead to usability issues. If the KStream wants to 
> read from a topic and is not allowed to, this should raise an error, not be 
> silently ignored.
>  
> Hoe do producer/consumer clients handle that situation?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10226) KStream without SASL information should return error in confluent cloud

2020-07-09 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154909#comment-17154909
 ] 

John Roesler commented on KAFKA-10226:
--

Hi [~wdaehn] ,

I'm no expert on this stuff, but I think that the constructor alone may not 
actually connect to the server. Do you also get no exception if you try to send 
and close?

IIRC, the constructor does validate that the provided host is resolvable by 
DNS, which is probably why you get an immediate exception on a bad hostname.

Thanks,

-John

> KStream without SASL information should return error in confluent cloud
> ---
>
> Key: KAFKA-10226
> URL: https://issues.apache.org/jira/browse/KAFKA-10226
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, streams
>Affects Versions: 2.5.0
>Reporter: Werner Daehn
>Priority: Minor
>
> I have create a KStream against the Confluent cloud and wondered why no data 
> has been received from the source. Reason was that I forgot to add the SASL 
> api keys and secrets.
>  
> For end users this might lead to usability issues. If the KStream wants to 
> read from a topic and is not allowed to, this should raise an error, not be 
> silently ignored.
>  
> Hoe do producer/consumer clients handle that situation?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9584) Removing headers causes ConcurrentModificationException

2020-07-09 Thread Micah Ramos (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Micah Ramos reassigned KAFKA-9584:
--

Assignee: (was: Micah Ramos)

> Removing headers causes ConcurrentModificationException
> ---
>
> Key: KAFKA-9584
> URL: https://issues.apache.org/jira/browse/KAFKA-9584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Micah Ramos
>Priority: Minor
>
> The consumer record that is used during punctuate is static, this can cause 
> java.util.ConcurrentModificationException when modifying the headers. 
> Using a single instance of ConsumerRecord for all punctuates causes other 
> strange behavior:
>  # Headers are shared across partitions.
>  # A topology that adds a single header could append an infinite number of 
> headers (one per punctuate iteration), causing memory problems in the current 
> topology as well as down stream consumers since the headers are written with 
> the record when it is produced to a topic.  
>  
> I would expect that each invocation of punctuate would be initialized with a 
> new header object.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10256) Create a server gradle module for Java code needed only by servers

2020-07-09 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-10256.
--
Resolution: Won't Fix

> Create a server gradle module for Java code needed only by servers
> --
>
> Key: KAFKA-10256
> URL: https://issues.apache.org/jira/browse/KAFKA-10256
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>
> It doesn't really make sense to have a "server" directory in the "clients" 
> gradle module.  The client is not the server.  Instead, we should have a 
> separate gradle module for code which is server-specific.
> This will avoid polluting the client CLASSPATH with code which is internal to 
> the server, and make the functional division of the code clearer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


dajac commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452440483



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452440068



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
   store.stateStore.name(), store.offset, 
store.changelogPartition);
 } else {
-// with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+// With EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
 // and hence we are uncertain that the current local 
state only contains committed data;
 // in that case we need to treat it as a 
task-corrupted exception
-if (eosEnabled && !storeDirIsEmpty) {
+
+// Note, this is a little overzealous, since we aren't 
checking whether the store's specific
+// directory is nonempty, only if there are any 
directories for any stores. So if there are
+// two stores in a task, and one is correctly written 
and checkpointed, while the other is
+// neither written nor checkpointed, we _could_ 
correctly load the first and recover the second
+// but instead we'll consider the whole task corrupted 
and discard the first and recover both.

Review comment:
   https://issues.apache.org/jira/browse/KAFKA-10260





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10260) Streams could recover stores in a task independently

2020-07-09 Thread John Roesler (Jira)
John Roesler created KAFKA-10260:


 Summary: Streams could recover stores in a task independently
 Key: KAFKA-10260
 URL: https://issues.apache.org/jira/browse/KAFKA-10260
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


Currently, ProcessorStateManager checks for corrupted tasks by checking, for 
each persistent store, if its checkpoint is missing, then the task directory 
must also be empty.

This is a little overzealous, since we aren't checking whether the store's 
specific directory is nonempty, only if there are any directories for any 
stores. So if there are two stores in a task, and one is correctly written and 
checkpointed, while the other is neither written nor checkpointed, we _could_ 
correctly load the first and recover the second but instead we'll consider the 
whole task corrupted and discard the first and recover both.

The fix would be to check, for each persistent store that doesn't have a 
checkpoint, that its _specific_ store directory is also missing. Such a store 
will be restored from the changelog and we don't need to consider the task 
corrupted.

See ProcessorStateManager#initializeStoreOffsetsFromCheckpoint



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy closed pull request #8937: MINOR: Create ChannelBuilder for each connection in ConnectionStressWorker workload

2020-07-09 Thread GitBox


omkreddy closed pull request #8937:
URL: https://github.com/apache/kafka/pull/8937


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


vvcephei commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452433188



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -104,26 +104,27 @@ static void closeStateManager(final Logger log,
 if (stateDirectory.lock(id)) {
 try {
 stateMgr.close();
-
-if (wipeStateStore) {
-log.debug("Wiping state stores for {} task {}", 
taskType, id);
-// we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
-// and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
-// need to re-bootstrap the restoration from the 
beginning
-Utils.delete(stateMgr.baseDir());
-}
 } catch (final ProcessorStateException e) {
 firstException.compareAndSet(null, e);
 } finally {
-stateDirectory.unlock(id);
+try {
+if (wipeStateStore) {
+log.debug("Wiping state stores for {} task {}", 
taskType, id);
+// we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
+// and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
+// need to re-bootstrap the restoration from the 
beginning
+Utils.delete(stateMgr.baseDir());
+}
+} finally {
+stateDirectory.unlock(id);
+}

Review comment:
   I take it this block can also throw an exception? We shouldn't throw 
exceptions inside a finally block because it's not defined when the exception 
will be thrown, or in the case where the first try block threw, which exception 
is ultimately thrown is also undefined.
   
   To make this simpler to grapple with, we added 
org.apache.kafka.streams.state.internals.ExceptionUtils#executeAll

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -222,8 +222,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.trace("Loaded offsets from the checkpoint file: {}", 
loadedCheckpoints);
 
 for (final StateStoreMetadata store : stores.values()) {
+if (store.corrupted) {
+log.error("Tried to initialize store offsets for corrupted 
store {}", store);
+throw new IllegalStateException("Should not initialize 
offsets for a corrupted task");
+}
+
 if (store.changelogPartition == null) {
 log.info("State store {} is not logged and hence would not 
be restored", store.stateStore.name());
+} else if (!store.stateStore.persistent()) {

Review comment:
   I think we should also remove the changelogPartition from 
loadedCheckpoints, if it exists. Otherwise, we'll spuriously warn in L267.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r452425472



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,24 +45,23 @@
 protected ProcessorNode currentNode;
 private long currentSystemTimeMs;
 
-final StateManager stateManager;

Review comment:
   If I understand you correctly, you propose to have a processor state 
manager reference in the `AbstractProcessorContext` and in 
`ProcessorContextImpl` instead of in the `GlobalProcessorContextImpl` and in 
`ProcessorContextImpl`. Moreover, you want to have a method `stateManager()` in 
`AbstractProcessorContext` that is overridden only in `ProcessorContextImpl`.
   
   FWIW, I think it is cleaner to have the references in each child and an 
abstract method `stateManager()` in `AbstractProcessorContext` that is 
overridden in both children. My reasoning  is that both children have a state 
manager that is used in `AbstractProcessorContext` (i.e., both should provide a 
method `stateManager()`) but each child uses a different type of state manager 
internally.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452424595



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
   store.stateStore.name(), store.offset, 
store.changelogPartition);
 } else {
-// with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+// With EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
 // and hence we are uncertain that the current local 
state only contains committed data;
 // in that case we need to treat it as a 
task-corrupted exception
-if (eosEnabled && !storeDirIsEmpty) {
+
+// Note, this is a little overzealous, since we aren't 
checking whether the store's specific
+// directory is nonempty, only if there are any 
directories for any stores. So if there are
+// two stores in a task, and one is correctly written 
and checkpointed, while the other is
+// neither written nor checkpointed, we _could_ 
correctly load the first and recover the second
+// but instead we'll consider the whole task corrupted 
and discard the first and recover both.
+if (store.stateStore.persistent() && eosEnabled && 
!taskDirIsEmpty) {

Review comment:
   Oh, I put this comment on the wrong line:
   
   > FYI, this is also fixed (better) in #8996
   
   In other words, I agree, I like your fix better as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452419917



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -193,6 +196,26 @@ private void closeAndRevive(final Map> taskWith
 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
 }
 task.closeDirty();
+if (task.isActive()) {
+// Pause so we won't poll any more records for this task until 
it has been re-initialized
+// Note, closeDirty already clears the partitiongroup for the 
task.
+final Set currentAssignment = 
mainConsumer().assignment();
+final Set assignedToPauseAndReset =
+Utils.intersection(HashSet::new, currentAssignment, 
task.inputPartitions());

Review comment:
   FYI, this is also fixed (better) in 
https://github.com/apache/kafka/pull/8996





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452423901



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
   store.stateStore.name(), store.offset, 
store.changelogPartition);
 } else {
-// with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+// With EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
 // and hence we are uncertain that the current local 
state only contains committed data;
 // in that case we need to treat it as a 
task-corrupted exception
-if (eosEnabled && !storeDirIsEmpty) {
+
+// Note, this is a little overzealous, since we aren't 
checking whether the store's specific
+// directory is nonempty, only if there are any 
directories for any stores. So if there are
+// two stores in a task, and one is correctly written 
and checkpointed, while the other is
+// neither written nor checkpointed, we _could_ 
correctly load the first and recover the second
+// but instead we'll consider the whole task corrupted 
and discard the first and recover both.

Review comment:
   IIRC, @guozhangwang tried to include this during the February refactor, 
but it's harder to get right than it sounds. Still, it would be very nice to 
have it, and I agree it would be good to file a ticket.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


vvcephei merged pull request #8902:
URL: https://github.com/apache/kafka/pull/8902


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10259) KIP-554: Add Broker-side SCRAM Config API

2020-07-09 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-10259:
-

 Summary: KIP-554: Add Broker-side SCRAM Config API
 Key: KAFKA-10259
 URL: https://issues.apache.org/jira/browse/KAFKA-10259
 Project: Kafka
  Issue Type: New Feature
Reporter: Ron Dagostino
Assignee: Ron Dagostino






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-09 Thread GitBox


vvcephei commented on pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#issuecomment-656291661


   Testing again:
   the new test: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4018/
   all streams tests: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4019/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


vvcephei commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452419917



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -193,6 +196,26 @@ private void closeAndRevive(final Map> taskWith
 log.error("Error suspending corrupted task {} ", task.id(), 
swallow);
 }
 task.closeDirty();
+if (task.isActive()) {
+// Pause so we won't poll any more records for this task until 
it has been re-initialized
+// Note, closeDirty already clears the partitiongroup for the 
task.
+final Set currentAssignment = 
mainConsumer().assignment();
+final Set assignedToPauseAndReset =
+Utils.intersection(HashSet::new, currentAssignment, 
task.inputPartitions());

Review comment:
   FYI, this is also fixed (better) in 
https://github.com/apache/kafka/pull/8996





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


vvcephei commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-656290123


   Thanks, all. I think we can merge this now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


cadonna commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r452415850



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -103,8 +104,12 @@ public void init(final ProcessorContext context,
 
 @SuppressWarnings("unchecked")
 void initStoreSerde(final ProcessorContext context) {
+final String storeName = name();
+final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName);
 serdes = new StateSerdes<>(
-ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
+ changelogTopic != null ?
+changelogTopic :
+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),

Review comment:
   I would not put any code that is not related to casts of 
`ProcessorContext` into `ProessorContextUtils`. I think the goal of 
`ProessorContextUtils` is to contain all code of which we want to get rid of in 
the future once the casts are fixed.
   
   We could move the `null` check into the constructor of `StateSerde` since we 
do also there a `null` check. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop

2020-07-09 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154790#comment-17154790
 ] 

Chris Egerton commented on KAFKA-10253:
---

[~klalafaryan] can you provide the configurations for all three of your workers?

It would also be helpful if you could restart one of the workers and capture 
the logs for it from startup through the first few iterations of this rebalance 
loop.

> Kafka Connect gets into an infinite rebalance loop
> --
>
> Key: KAFKA-10253
> URL: https://issues.apache.org/jira/browse/KAFKA-10253
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Konstantin Lalafaryan
>Priority: Blocker
>
> Hello everyone!
>  
> We are running kafka-connect cluster  (3 workers) and very often it gets into 
> an infinite rebalance loop.
>  
> {code:java}
> 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Was selected to perform assignments, but do not have latest 
> config found in sync request. Returning an empty configuration to trigger 
> re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Successfully joined group with generation 305655831 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Joined group at generation 305655831 with protocol version 2 
> and got assignment: Assignment{error=1, 
> leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
> leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Was selected to perform assignments, but do not have latest 
> config found in sync request. Returning an empty configuration to trigger 
> re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Successfully joined group with generation 305655832 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Joined group at generation 305655832 with protocol version 2 
> and got assignment: Assignment{error=1, 
> leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', 
> leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Was selected to perform assignments, but do not have latest 
> config found in sync request. Returning an empty configuration to trigger 
> re-sync. 
> (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) 
> [DistributedHerder-connect-1-1]
> 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= 
> kafka-connect] Successfully joined gr

[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452410965



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -95,7 +96,7 @@
  *  |  | Assigned (3)| <+
  *  |  +-+---+  |
  *  ||  |
- *  ||  |
+ *  ||--+

Review comment:
   Where does this self-transition happen exactly? And could/should we 
detect this case and not call `setState()` for this case instead of allowing 
the transition?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452408268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
   store.stateStore.name(), store.offset, 
store.changelogPartition);
 } else {
-// with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+// With EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
 // and hence we are uncertain that the current local 
state only contains committed data;
 // in that case we need to treat it as a 
task-corrupted exception
-if (eosEnabled && !storeDirIsEmpty) {
+
+// Note, this is a little overzealous, since we aren't 
checking whether the store's specific
+// directory is nonempty, only if there are any 
directories for any stores. So if there are
+// two stores in a task, and one is correctly written 
and checkpointed, while the other is
+// neither written nor checkpointed, we _could_ 
correctly load the first and recover the second
+// but instead we'll consider the whole task corrupted 
and discard the first and recover both.

Review comment:
   Sound like something we should fix. Can you file a ticket?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-6453) Document timestamp propagation semantics

2020-07-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6453:
---
Description: 
Atm, Kafka Streams only has a defined "contract" about timestamp propagation at 
the Processor API level: all processor within a sub-topology, see the timestamp 
from the input topic record and this timestamp will be used for all result 
record when writing them to an topic, too.

The DSL, inherits this "contract" atm.

>From a DSL point of view, it would be desirable to provide a different 
>contract to the user. To allow this, we need to do the following:

 - extend Processor API to allow manipulation timestamps (ie, a Processor can 
set a new timestamp for downstream records)
  - define a DSL "contract" for timestamp propagation for each DSL operator
  - document the DSL "contract"
  - implement the DSL "contract" using the new/extended Processor API

Changing the DSL contract etc was done via KIP-258. This ticket is about 
documenting the contract.

  was:
Atm, Kafka Streams only has a defined "contract" about timestamp propagation at 
the Processor API level: all processor within a sub-topology, see the timestamp 
from the input topic record and this timestamp will be used for all result 
record when writing them to an topic, too.

The DSL, inherits this "contract" atm.

>From a DSL point of view, it would be desirable to provide a different 
>contract to the user. To allow this, we need to do the following:

 - extend Processor API to allow manipulation timestamps (ie, a Processor can 
set a new timestamp for downstream records)
 - define a DSL "contract" for timestamp propagation for each DSL operator
 - document the DSL "contract"
 - implement the DSL "contract" using the new/extended Processor API


> Document timestamp propagation semantics
> 
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: James Galasyn
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>   - define a DSL "contract" for timestamp propagation for each DSL operator
>   - document the DSL "contract"
>   - implement the DSL "contract" using the new/extended Processor API
> Changing the DSL contract etc was done via KIP-258. This ticket is about 
> documenting the contract.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6453) Document timestamp propagation semantics

2020-07-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6453:
---
Summary: Document timestamp propagation semantics  (was: Reconsider 
timestamp propagation semantics)

> Document timestamp propagation semantics
> 
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: James Galasyn
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>  - define a DSL "contract" for timestamp propagation for each DSL operator
>  - document the DSL "contract"
>  - implement the DSL "contract" using the new/extended Processor API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-6453) Reconsider timestamp propagation semantics

2020-07-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6453:
--

Assignee: James Galasyn  (was: Victoria Bialas)

> Reconsider timestamp propagation semantics
> --
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: James Galasyn
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>  - define a DSL "contract" for timestamp propagation for each DSL operator
>  - document the DSL "contract"
>  - implement the DSL "contract" using the new/extended Processor API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


dajac commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452400759



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   That’s good to know. I always thought that the iteration was more or 
less equivalent. I have learnt something today. Let me update that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


ijuma commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452396471



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   FYI https://twitter.com/joshbloch/status/583813919019573248 :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


ijuma commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452395964



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   Not at all. This is a perfect case for ArrayList. LinkedList iteration 
is very slow due to pointer chasing (in comparison).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r452394510



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -103,8 +104,12 @@ public void init(final ProcessorContext context,
 
 @SuppressWarnings("unchecked")
 void initStoreSerde(final ProcessorContext context) {
+final String storeName = name();
+final String changelogTopic = 
ProcessorContextUtils.changelogFor(context, storeName);
 serdes = new StateSerdes<>(
-ProcessorStateManager.storeChangelogTopic(context.applicationId(), 
name()),
+ changelogTopic != null ?
+changelogTopic :
+
ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName),

Review comment:
   Ah thanks. I missed this case.
   
   However, should we move both `null` checks into 
`ProcessorContextUtils.changelogFor()` for this case? It seem, we do the same 
"outer" `null`-check each time we call the method, so why not do it at a single 
place in the code?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set

2020-07-09 Thread GitBox


dajac commented on a change in pull request #8999:
URL: https://github.com/apache/kafka/pull/8999#discussion_r452393587



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 }
 
 /**
- * Return the Set of nodes whose connection setup has timed out.
+ * Return the List of nodes whose connection setup has timed out.
  * @param now the current time in ms
  */
-public Set nodesWithConnectionSetupTimeout(long now) {
+public List nodesWithConnectionSetupTimeout(long now) {
 return connectingNodes.stream()
 .filter(id -> isConnectionSetupTimeout(id, now))
-.collect(Collectors.toSet());
+.collect(Collectors.toCollection(LinkedList::new));

Review comment:
   We only add elements to the List and then iterate over it. A LinkedList 
seems slightly better for this case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


mjsax commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r452391941



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##
@@ -45,24 +45,23 @@
 protected ProcessorNode currentNode;
 private long currentSystemTimeMs;
 
-final StateManager stateManager;

Review comment:
   I see. Would it not be sufficient to just keep a ("duplicate") reference 
of `ProcessorStateManager` within `ProcessorContextImpl`? 
   
   Just to clarify: I am ok with the proposed changes. Just wondering if it's 
really the best structure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


ableegoldman commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-656265143


   Aw...the EosBetaUpgradeIntegrationTest failed again?? :/ 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-09 Thread GitBox


cadonna commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-656263751


   Unrelated test failure:
   
   ```
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##
@@ -784,7 +784,7 @@ public void close() {
 }
 
 @Test
-public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws 
IOException {
+public void 
shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws 
IOException {
 final long checkpointOffset = 10L;
 
 final Map offsets = mkMap(

Review comment:
   Sorry yeah the relevant part doesn't show up on github. Basically we 
register
   ```  
   stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback);   
   stateMgr.registerStore(persistentStoreTwo, 
persistentStoreTwo.stateRestoreCallback); 
   stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback);
   ```
   
   but only write the checkpoint for the `persistentStorePartition`, 
`nonPersistentStorePartition` and `irrelevantPartition` . I think the point of 
the `irrelevantPartition` is to make sure that we detect that the 
`persistentStoreTwoPartition` offset is missing even though the checkpoint 
technically has the correct number of offsets in total. ie, that we actually 
map the offsets to a registered changelog





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


mjsax commented on pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#issuecomment-656261358


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##
@@ -784,7 +784,7 @@ public void close() {
 }
 
 @Test
-public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws 
IOException {
+public void 
shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws 
IOException {
 final long checkpointOffset = 10L;
 
 final Map offsets = mkMap(

Review comment:
   Sorry yeah the relevant part doesn't show up on github. Basically we 
register
   ```  
   stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback);   
   stateMgr.registerStore(persistentStoreTwo, 
persistentStoreTwo.stateRestoreCallback); 
   stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback);
   ```
   
   but only write the checkpoint for the `persistentStorePartition`, 
nonPersistentStorePartition` and `irrelevantPartition` . I think the point of 
the `irrelevantPartition` is to make sure that we detect that the 
`persistentStoreTwoPartition` offset is missing even though the checkpoint 
technically has the correct number of offsets in total. ie, that we actually 
map the offsets to a registered changelog





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


mjsax commented on pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#issuecomment-656261242


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8994:
URL: https://github.com/apache/kafka/pull/8994#discussion_r452381474



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean 
storeDirIsEmpty) {
 log.debug("State store {} initialized from checkpoint 
with offset {} at changelog {}",
   store.stateStore.name(), store.offset, 
store.changelogPartition);
 } else {
-// with EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
+// With EOS, if the previous run did not shutdown 
gracefully, we may lost the checkpoint file
 // and hence we are uncertain that the current local 
state only contains committed data;
 // in that case we need to treat it as a 
task-corrupted exception
-if (eosEnabled && !storeDirIsEmpty) {
+
+// Note, this is a little overzealous, since we aren't 
checking whether the store's specific
+// directory is nonempty, only if there are any 
directories for any stores. So if there are
+// two stores in a task, and one is correctly written 
and checkpointed, while the other is
+// neither written nor checkpointed, we _could_ 
correctly load the first and recover the second
+// but instead we'll consider the whole task corrupted 
and discard the first and recover both.
+if (store.stateStore.persistent() && eosEnabled && 
!taskDirIsEmpty) {

Review comment:
   I like my fix better :P 
   
   But seriously, no need to block this PR on mine if it's suddenly causing 
tests to fail. Mysterious..





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-09 Thread GitBox


mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r452374925



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -994,30 +1023,29 @@ public void onSuccess(ClientResponse response, 
RequestFuture f
  *   value of each partition may be null only for v0. In v1 
and later the ListOffset API would not
  *   return a null timestamp (-1 is returned instead when 
necessary).
  */
-private void handleListOffsetResponse(Map timestampsToSearch,
+private void handleListOffsetResponse(Map timestampsToSearch,
   ListOffsetResponse 
listOffsetResponse,
   RequestFuture 
future) {
 Map fetchedOffsets = new HashMap<>();
 Set partitionsToRetry = new HashSet<>();
 Set unauthorizedTopics = new HashSet<>();
 
-for (Map.Entry entry 
: timestampsToSearch.entrySet()) {
+Map partitionsData = 
byTopicPartitions(listOffsetResponse.responseData());

Review comment:
   We now added logic in the AdminClient to handle partial responses from 
brokers (based on 
https://github.com/apache/kafka/pull/8295#discussion_r449575550). Shouldn't we 
do the same here instead of assuming the response is always complete? I'm not 
even sure if we should retry if a resource is missing from the response but we 
could at least log it instead of hitting a NPE.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar commented on pull request #8948: KAFKA-10174: Prefer --bootstrap-server for configs command in ducker tests

2020-07-09 Thread GitBox


vinothchandar commented on pull request #8948:
URL: https://github.com/apache/kafka/pull/8948#issuecomment-656251801


   @cmccabe this PR is a sub task under KAFKA-10131, which tracks the bigger 
goal. I filed a new issue explicitly targetting removal of `use_zk_connection` 
. For this pr, the public methods affects 
`alter_message_format`/`set_unclean_leader_election` don't offer this flag.. 
   
   So this PR should be fine for this scope?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##
@@ -784,7 +784,7 @@ public void close() {
 }
 
 @Test
-public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws 
IOException {
+public void 
shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws 
IOException {
 final long checkpointOffset = 10L;
 
 final Map offsets = mkMap(

Review comment:
   Sorry yeah the relevant part doesn't show up on github. Basically we 
register
   ```  
   stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback);   
   stateMgr.registerStore(persistentStoreTwo, 
persistentStoreTwo.stateRestoreCallback); 
   stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback);
   ```
   
   but only write the checkpoint for the `persistentStorePartition`, 
nonPersistentStorePartition` and `irrelevantPartition`. I think the point of 
the `irrelevantPartition` is to make sure that we detect that the 
`persistentStoreTwoPartition` offset is missing even though the checkpoint 
technically has the correct number of offsets in total. ie, that we actually 
map the offsets to a registered changelog





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10258) Get rid of use_zk_connection flag in kafka.py public methods

2020-07-09 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-10258:
--

 Summary: Get rid of use_zk_connection flag in kafka.py public 
methods
 Key: KAFKA-10258
 URL: https://issues.apache.org/jira/browse/KAFKA-10258
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vinoth Chandar






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-09 Thread GitBox


vvcephei commented on pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#issuecomment-656249347


   Ah, and the other smoke test usages are messed up:
   ```
   test_id:
kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_rebalance_simple
   status: FAIL
   run time:   1 minute 28.121 seconds
   
   
   __init__() takes exactly 4 arguments (3 given)
   Traceback (most recent call last):
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py",
 line 134, in run
   data = self.run_test()
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py",
 line 192, in run_test
   return self.test_context.function(self.test)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_eos_test.py",
 line 41, in test_rebalance_simple
   self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, 
self.kafka),
   TypeError: __init__() takes exactly 4 arguments (3 given)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores

2020-07-09 Thread GitBox


ableegoldman commented on a change in pull request #8996:
URL: https://github.com/apache/kafka/pull/8996#discussion_r452370517



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -104,17 +104,16 @@ static void closeStateManager(final Logger log,
 if (stateDirectory.lock(id)) {
 try {
 stateMgr.close();
-
+} catch (final ProcessorStateException e) {
+firstException.compareAndSet(null, e);
+} finally {
 if (wipeStateStore) {
 log.debug("Wiping state stores for {} task {}", 
taskType, id);
 // we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
 // and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
 // need to re-bootstrap the restoration from the 
beginning
 Utils.delete(stateMgr.baseDir());

Review comment:
   Right, it's not a correctness issue but it's additional needless 
overhead to go through the whole cycle of initializing a task, getting a 
TaskCorrupted, wiping it then, and finally restarting it. Of course if we keep 
hitting an issue during `closeDirty` then we might never wipe the state, which 
does seem like a real problem. For example if there's some issue with the 
state, like the files are actually corrupted or something





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10213) Prefer --bootstrap-server in ducktape tests for Kafka clients

2020-07-09 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar reassigned KAFKA-10213:
--

Assignee: Ron Dagostino  (was: Vinoth Chandar)

> Prefer --bootstrap-server in ducktape tests for Kafka clients
> -
>
> Key: KAFKA-10213
> URL: https://issues.apache.org/jira/browse/KAFKA-10213
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vinoth Chandar
>Assignee: Ron Dagostino
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on pull request #8973: KAFKA-10218: Stop reading config topic in every subsequent tick if catchup fails once

2020-07-09 Thread GitBox


C0urante commented on pull request #8973:
URL: https://github.com/apache/kafka/pull/8973#issuecomment-656247489


   @gharris1727 @chia7712 @ncliang would any of you be interested in reviewing 
this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #9003: KAFKA-10240: Stop throwing WakeupExceptions during sink task shutdown

2020-07-09 Thread GitBox


C0urante commented on pull request #9003:
URL: https://github.com/apache/kafka/pull/9003#issuecomment-656247517


   @gharris1727 @chia7712 @ncliang would any of you be interested in reviewing 
this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10235) Fix flaky transactions_test.py

2020-07-09 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-10235:

Fix Version/s: (was: 3.0.0)
   2.7.0

> Fix flaky transactions_test.py
> --
>
> Key: KAFKA-10235
> URL: https://issues.apache.org/jira/browse/KAFKA-10235
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.7.0
>
>
> {code}
> =hard_bounce.bounce_target=clients.check_order=False.use_group_metadata=False:
>  FAIL: copier-1 : Message copier didn't make enough progress in 30s. Current 
> progress: 0
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 134, in run
> data = self.run_test()
>   File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 192, in run_test
> return self.test_context.function(self.test)
>   File "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line 
> 429, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line 
> 254, in test_transactions
> num_messages_to_copy=self.num_seed_messages, 
> use_group_metadata=use_group_metadata)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line 
> 195, in copy_messages_transactionally
> self.bounce_copiers(copiers, clean_shutdown)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line 
> 120, in bounce_copiers
> % (copier.transactional_id, str(copier.progress_percent(
>   File "/usr/local/lib/python2.7/dist-packages/ducktape/utils/util.py", line 
> 41, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg)
> TimeoutError: copier-1 : Message copier didn't make enough progress in 30s. 
> Current progress: 0
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-09 Thread GitBox


vvcephei commented on pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#issuecomment-656247859


   Looks like there was a spurious failure of the new test:
   ```
   test_id:
kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade.to_version=2.5.0-SNAPSHOT.from_version=2.0.1.bounce_type=full
   status: FAIL
   run time:   1 minute 43.164 seconds
   
   
   Server connection dropped: 
   Traceback (most recent call last):
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py",
 line 134, in run
   data = self.run_test()
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py",
 line 192, in run_test
   return self.test_context.function(self.test)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/mark/_mark.py",
 line 429, in wrapper
   return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_application_upgrade_test.py",
 line 108, in test_app_upgrade
   self.restart_all_nodes_with(to_version)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_application_upgrade_test.py",
 line 173, in restart_all_nodes_with
   self.processor2.start_node(self.processor2.node)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/streams.py",
 line 305, in start_node
   node.account.create_file(self.CONFIG_FILE, prop_file)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/cluster/remoteaccount.py",
 line 588, in create_file
   with self.sftp_client.open(path, "w") as f:
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py",
 line 372, in open
   t, msg = self._request(CMD_OPEN, filename, imode, attrblock)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py",
 line 813, in _request
   return self._read_response(num)
 File 
"/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py",
 line 845, in _read_response
   raise SSHException("Server connection dropped: {}".format(e))
   SSHException: Server connection dropped: 
   ```
   I'll re-run it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #9003: KAFKA-10240: Stop throwing WakeupExceptions during sink task shutdown

2020-07-09 Thread GitBox


C0urante opened a new pull request #9003:
URL: https://github.com/apache/kafka/pull/9003


   A benign `WakeupException` can be thrown by a sink task's consumer if it's 
scheduled for shutdown by the worker. This is caught and handled gracefully if 
the exception is thrown when calling `poll` on the consumer, but not if called 
`commitSync`, which is invoked by a task during shutdown and also when its 
partition assignment is updated.
   
   If thrown during a partition assignment update, the `WakeupException` is 
caught and handled gracefully as part of the task's `iteration` loop. If thrown 
during shutdown, however, it is not caught and instead leads to the 
scary-looking log message "Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted.".
   
   These changes catch the `WakeupException` during shutdown and handle it 
gracefully with a `TRACE`-level log message.
   
   A unit test is added to verify this behavior by simulating a thrown 
`WakeupException` during `Consumer::commitSync`, running through the 
`WorkerSinkTask::execute` method, and confirming that it does not throw a 
`WakeupException` itself.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10174) Prefer --bootstrap-server ducktape tests using kafka_configs.sh

2020-07-09 Thread Vinoth Chandar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154741#comment-17154741
 ] 

Vinoth Chandar commented on KAFKA-10174:


[https://github.com/apache/kafka/pull/8948]

 

> Prefer --bootstrap-server ducktape tests using kafka_configs.sh
> ---
>
> Key: KAFKA-10174
> URL: https://issues.apache.org/jira/browse/KAFKA-10174
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy commented on pull request #8878: MINOR: Generator config-specific HTML ids

2020-07-09 Thread GitBox


omkreddy commented on pull request #8878:
URL: https://github.com/apache/kafka/pull/8878#issuecomment-656246737


   I agree,  it's a useful fix. I think its OK to break existing links.  I have 
not seen anywhere using those links. Lets see what others think.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >