[GitHub] [kafka] guozhangwang commented on pull request #12337: KAFKA-10199: Remove main consumer from store changelog reader

2022-07-06 Thread GitBox


guozhangwang commented on PR #12337:
URL: https://github.com/apache/kafka/pull/12337#issuecomment-1177071757

   > @guozhangwang Didn’t we decide to use “ requireStable” in the KIP?
   
   I must have reverted that commit by mistake when I tried to revert the 
scripting related code, I will file a hotfix commit to fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log

2022-07-06 Thread GitBox


showuon commented on PR #12274:
URL: https://github.com/apache/kafka/pull/12274#issuecomment-1176986634

   >  I think this will unfence the broker at startup even if the broker hasn't 
applied the snapshot or any of the log records, right?
   
   Currently, we will replay the metadata records when metadata listener got 
new records. So yes, if we just return the current LEO, the records/snapshots 
might have not applied, yet. 
   
   Sorry, it's easy to reject other's proposal, but difficult to come up 
another solution. If we don't have any other better solution, maybe we can try 
the original proposed one?
   ```
   One solution to this problem is to require the broker to only catch up to 
the last committed offset when they last sent the heartbeat. For example:
   
   Broker sends a heartbeat with current offset of Y. The last commit 
offset is X. The controller remember this last commit offset, call it X'
   Broker sends another heartbeat with current offset of Z. Unfence the 
broker if Z >= X or Z >= X'.
   ```
   
   And again, thanks for keeping trying to fix this difficult issue, 
@dengziming !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12337: KAFKA-10199: Remove main consumer from store changelog reader

2022-07-06 Thread GitBox


dajac commented on PR #12337:
URL: https://github.com/apache/kafka/pull/12337#issuecomment-1176984813

   @guozhangwang Didn’t we decide to use “ requireStable” in the KIP?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12337: KAFKA-10199: Remove main consumer from store changelog reader

2022-07-06 Thread GitBox


dajac commented on code in PR #12337:
URL: https://github.com/apache/kafka/pull/12337#discussion_r915401645


##
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java:
##
@@ -44,10 +45,21 @@ public ListConsumerGroupOffsetsOptions 
topicPartitions(List topi
 return this;
 }
 
+/**
+ * Sets an optional requireStable flag.
+ */
+public void requireStable(final boolean requireStable) {
+this.requireStable = requireStable;
+}
+
 /**
  * Returns a list of topic partitions to add as part of the result.
  */
 public List topicPartitions() {
 return topicPartitions;
 }
+
+public boolean shouldRequireStable() {

Review Comment:
   Didn’t we settle on using requireStable in the KIP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

2022-07-06 Thread GitBox


showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r915386298


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -1854,6 +1853,33 @@ class LogCleanerTest {
 } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000)
+
+val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new 
KafkaConfig(oldKafkaProps)),
+  logDirs = Array(TestUtils.tempDir()),
+  logs = new Pool[TopicPartition, UnifiedLog](),
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  time = time) {
+  // shutdown() and startup() are called in LogCleaner.reconfigure().
+  // Empty startup() and shutdown() to ensure that no unnecessary log 
cleaner threads remain after this test.
+  override def startup(): Unit = {}
+  override def shutdown(): Unit = {}
+}
+
+try {
+  assertEquals(1000, logCleaner.throttler.desiredRatePerSec, 
s"Throttler.desiredRatePerSec should be initialized from initial 
`${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+
+  val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+  newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 
2000)
+
+  logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new 
KafkaConfig(newKafkaProps))
+
+  assertEquals(2000, logCleaner.throttler.desiredRatePerSec, 
s"Throttler.desiredRatePerSec should be updated with new 
`${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+} finally logCleaner.shutdown();

Review Comment:
   nit: (1) no semicolon is needed (2) the format in Kafka is usually like this:
   ```
   finally {
 logCleaner.shutdown()
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12188: KAFKA-10892: Shared Readonly State Stores

2022-07-06 Thread GitBox


mjsax commented on code in PR #12188:
URL: https://github.com/apache/kafka/pull/12188#discussion_r915376733


##
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder storeBuilder,
 return this;
 }
 
+/**
+ * Adds a Read Only {@link StateStore} to the topology.
+ *

Review Comment:
   nit: do we need a `` tag here?



##
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder storeBuilder,
 return this;
 }
 
+/**
+ * Adds a Read Only {@link StateStore} to the topology.

Review Comment:
   nit: `read-only`



##
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder storeBuilder,
 return this;
 }
 
+/**
+ * Adds a Read Only {@link StateStore} to the topology.
+ *
+ * A Read Only StateStore can use any compacted topic as a changelog.

Review Comment:
   Proposal:
   ```
   A read-only state store uses its input topic for fault-tolerance. Thus, in 
contrast to regular state stores, it must never create an internal changelog 
topic. Therefore, the input topic should be configured with log compaction.
   ```



##
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder storeBuilder,
 return this;
 }
 
+/**
+ * Adds a Read Only {@link StateStore} to the topology.
+ *
+ * A Read Only StateStore can use any compacted topic as a changelog.
+ * 
+ * A {@link SourceNode} will be added to consume the data arriving from 
the partitions of the input topic.
+ * 
+ * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+ *
+ * @param storeBuilder  user defined key value store builder

Review Comment:
   If we are limited to kv-store, should we change the type to 
`StoreBuilder` (or similar)?



##
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##
@@ -737,6 +737,91 @@ public synchronized Topology addStateStore(final 
StoreBuilder storeBuilder,
 return this;
 }
 
+/**
+ * Adds a Read Only {@link StateStore} to the topology.
+ *
+ * A Read Only StateStore can use any compacted topic as a changelog.
+ * 
+ * A {@link SourceNode} will be added to consume the data arriving from 
the partitions of the input topic.
+ * 
+ * The provided {@link ProcessorSupplier} will be used to create an {@link 
ProcessorNode} that will receive all
+ * records forwarded from the {@link SourceNode}.
+ * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
+ *
+ * @param storeBuilder  user defined key value store builder
+ * @param sourceNamename of the {@link SourceNode} that will 
be automatically added
+ * @param timestampExtractorthe stateless timestamp extractor used for 
this source,
+ *  if not specified the default extractor 
defined in the configs will be used
+ * @param keyDeserializer   the {@link Deserializer} to deserialize 
keys with
+ * @param valueDeserializer the {@link Deserializer} to deserialize 
values with
+ * @param topic the topic to source the data from
+ * @param processorName the name of the {@link ProcessorSupplier}
+ * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+ * @return itself
+ * @throws TopologyException if the processor of state is already 
registered
+ */
+public synchronized  Topology addReadOnlyStateStore(final 
StoreBuilder storeBuilder,
+  final String 
sourceName,
+  final 
TimestampExtractor timestampExtractor,
+  final 
Deserializer keyDeserializer,
+  final 
Deserializer valueDeserializer,
+  final String 
topic,
+  final String 
processorName,
+  final 
ProcessorSupplier stateUpdateSupplier) {
+if (storeBuilder.loggingEnabled()) {
+// -- disabling logging. We might want to print some logging.
+ 

[GitHub] [kafka] guozhangwang opened a new pull request, #12387: KAFKA-10199: Add RESUME in task updater

2022-07-06 Thread GitBox


guozhangwang opened a new pull request, #12387:
URL: https://github.com/apache/kafka/pull/12387

   This should be reviewed after https://github.com/apache/kafka/pull/12386.
   
   1) Need to check `enforceRestoreActive` / `transitToUpdateStandby` when 
resuming a paused task.
   2) Do not expose another `getResumedTasks` since I think its caller only 
need the `getPausedTasks`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang opened a new pull request, #12386: KAFKA-10199: Add PAUSE in task updater

2022-07-06 Thread GitBox


guozhangwang opened a new pull request, #12386:
URL: https://github.com/apache/kafka/pull/12386

   1. Add pause action to task-updater. 
   2. When removing a task, also check in the paused tasks in addition to 
removed tasks.
   3. Also I realized we do not check if tasks with the same id are added, so I 
add that check in this PR as well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

2022-07-06 Thread GitBox


mjsax commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r915364409


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java:
##
@@ -289,7 +289,10 @@ private  StoreBuilder> 
materialize(final MaterializedInt
 // do not enable cache if the emit final strategy is used
 if (materialized.cachingEnabled() && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
 builder.withCachingEnabled();
+} else {
+builder.withCachingDisabled();

Review Comment:
   Why do we need this call? I thought we only add tests in this PR? Is the 
feature not completed yet and we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12338: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-06 Thread GitBox


guozhangwang commented on code in PR #12338:
URL: https://github.com/apache/kafka/pull/12338#discussion_r915368085


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -337,7 +369,7 @@ void addToSuccessfullyProcessed(final Task task) {
 successfullyProcessed.add(task);
 }
 
-void removeTaskFromCuccessfullyProcessedBeforeClosing(final Task task) {
+void removeTaskFromSuccessfullyProcessedBeforeClosing(final Task task) {

Review Comment:
   Fixed a typo in function name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12338: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-06 Thread GitBox


guozhangwang commented on code in PR #12338:
URL: https://github.com/apache/kafka/pull/12338#discussion_r915355254


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -83,7 +83,6 @@ class DefaultStateUpdaterTest {
 private final Time time = new MockTime(1L);
 private final StreamsConfig config = new StreamsConfig(configProps());
 private final ChangelogReader changelogReader = 
mock(ChangelogReader.class);
-private final java.util.function.Consumer> 
offsetResetter = topicPartitions -> { };

Review Comment:
   This is a leftover from previous commit, we do not need this anymore.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -34,29 +33,41 @@
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.utils.Utils.filterMap;
+import static org.apache.kafka.common.utils.Utils.union;
+
+/**
+ * All tasks contained by the Streams instance.
+ *
+ * Note that these tasks are shared between the TaskManager (stream thread) 
and the StateUpdater (restore thread),
+ * i.e. all running active tasks are processed by the former and all restoring 
active tasks and standby tasks are
+ * processed by the latter.
+ */
 class Tasks {
 private final Logger log;
 private final TopologyMetadata topologyMetadata;
 
-private final Map allTasksPerId = 
Collections.synchronizedSortedMap(new TreeMap<>());
-private final Map readOnlyTasksPerId = 
Collections.unmodifiableMap(allTasksPerId);
-private final Collection readOnlyTasks = 
Collections.unmodifiableCollection(allTasksPerId.values());
-
 // TODO: change type to `StreamTask`
 private final Map activeTasksPerId = new TreeMap<>();
+// TODO: change type to `StandbyTask`
+private final Map standbyTasksPerId = new TreeMap<>();
+
+// Tasks may have been assigned for a NamedTopology that is not yet known 
by this host. When that occurs we stash
+// these unknown tasks until either the corresponding NamedTopology is 
added and we can create them at last, or
+// we receive a new assignment and they are revoked from the thread.
+
+// Tasks may have been assigned but not yet created because:
+// 1. They are for a NamedTopology that is yet known by this host.
+// 2. They are to be recycled from an existing restoring task yet to be 
returned from the task updater.
+//
+// When that occurs we stash these pending tasks until either they are 
finally clear to be created,
+// or they are revoked from a new assignment.
+private final Map> pendingActiveTasks = new 
HashMap<>();

Review Comment:
   This is mainly for case 1) in the description.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java:
##
@@ -182,7 +159,7 @@ public Collection createTasks(final Consumer consumer,
 partitions
 );
 
-final InternalProcessorContext context = new ProcessorContextImpl(
+final InternalProcessorContext context = new 
ProcessorContextImpl(

Review Comment:
   Minor cleanup.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -285,34 +286,46 @@ public void handleAssignment(final Map> activeTasks,
 final LinkedHashMap taskCloseExceptions = 
new LinkedHashMap<>();
 final Map> activeTasksToCreate = new 
HashMap<>(activeTasks);
 final Map> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
+final Map> tasksToRecycle = new HashMap<>();
 final Comparator byId = Comparator.comparing(Task::id);
-final Set tasksToRecycle = new TreeSet<>(byId);
 final Set tasksToCloseClean = new TreeSet<>(byId);
 final Set tasksToCloseDirty = new TreeSet<>(byId);
 
-// first rectify all existing tasks
+tasks.purgePendingTasks(activeTasks.keySet(), standbyTasks.keySet());
+
+// first rectify all existing tasks:
+// 1. for tasks that are already owned, just resume and skip 
re-creating them
+// 2. for tasks that have changed active/standby status, just recycle 
and skip re-creating them
+// 3. otherwise, close them since they are no longer owned.
 for (final Task task : tasks.allTasks()) {
-if (activeTasks.containsKey(task.id()) && task.isActive()) {
-tasks.updateInputPartitionsAndResume(task, 
activeTasks.get(task.id()));
-activeTasksToCreate.remove(task.id());
-} else if (standbyTasks.containsKey(task.id()) && 
!task.isActive()) {
-tasks.updateInputPartitionsAndResume(task, 
standbyTasks.get(task.id()));
-standbyTasksToCreate.remove(task.id());
-} else if (activeTasks.containsKey(task.id()) || 
standbyTasks.containsKey(task.id())) {
-// check for tasks that were owned 

[jira] [Commented] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2022-07-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563510#comment-17563510
 ] 

Matthias J. Sax commented on KAFKA-14049:
-

{quote}Null Values in the Stream for a Left Join would indicate a Tombstone 
Message
{quote}
This does not make sense to me. A KStream is an event/fact stream and thus 
there are not delete/tombstone semantics. Only _changelogs_ have tombstone 
semantics, but changelogs are modeled as `KTables` in Kafka Streams.

I still tend to agree that the current semantics are not ideal, but it's hard 
to fix without introducing ambiguity (and the reason to drop them, is to avoid 
ambiguity).

I guess a current workaround would be, to implement the join using a 
`transform()` (or similar) – should not be too complicated.

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Priority: Major
>  Labels: beginner, newbie
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2022-07-06 Thread Matthias J. Sax (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14049 ]


Matthias J. Sax deleted comment on KAFKA-14049:
-

was (Author: mjsax):
Sounds like a duplicate to https://issues.apache.org/jira/browse/KAFKA-12317 ?

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Priority: Major
>  Labels: beginner, newbie
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2022-07-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563509#comment-17563509
 ] 

Matthias J. Sax commented on KAFKA-14049:
-

Sounds like a duplicate to https://issues.apache.org/jira/browse/KAFKA-12317 ?

> Relax Non Null Requirement for KStreamGlobalKTable Left Join
> 
>
> Key: KAFKA-14049
> URL: https://issues.apache.org/jira/browse/KAFKA-14049
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Saumya Gupta
>Priority: Major
>  Labels: beginner, newbie
>
> Null Values in the Stream for a Left Join would indicate a Tombstone Message 
> that needs to propagated if not actually joined with the GlobalKTable 
> message, hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-06 Thread GitBox


C0urante commented on code in PR #12320:
URL: https://github.com/apache/kafka/pull/12320#discussion_r915350266


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.ws.rs.core.Response;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.niceMock;
+import static org.easymock.EasyMock.replay;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RestClientTest {
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+private static final TypeReference TEST_TYPE = new 
TypeReference() {
+};
+private HttpClient httpClient;
+
+private static String toJsonString(Object obj) {
+try {
+return OBJECT_MAPPER.writeValueAsString(obj);
+} catch (JsonProcessingException e) {
+throw new RuntimeException(e);
+}
+}
+
+private static  RestClient.HttpResponse httpRequest(HttpClient 
httpClient, TypeReference typeReference) {
+return RestClient.httpRequest(
+httpClient, null, null, null, null, typeReference, null, null);
+}
+
+@BeforeEach
+public void mockSetup() {
+httpClient = niceMock(HttpClient.class);
+}
+
+@Test
+public void testSuccess() throws ExecutionException, InterruptedException, 
TimeoutException {
+int statusCode = Response.Status.OK.getStatusCode();
+String expectedResponse = toJsonString(new TestDTO("someContent"));
+setupHttpClient(statusCode, expectedResponse);
+
+RestClient.HttpResponse httpResp = httpRequest(httpClient, 
TEST_TYPE);
+assertEquals(httpResp.status(), statusCode);
+assertEquals(toJsonString(httpResp.body()), expectedResponse);
+}
+
+@Test
+public void testNoContent() throws ExecutionException, 
InterruptedException, TimeoutException {
+int statusCode = Response.Status.NO_CONTENT.getStatusCode();
+setupHttpClient(statusCode, null);
+
+RestClient.HttpResponse httpResp = httpRequest(httpClient, 
TEST_TYPE);
+assertEquals(httpResp.status(), statusCode);
+assertNull(httpResp.body());
+}
+
+@Test
+public void testError() throws ExecutionException, InterruptedException, 
TimeoutException {

Review Comment:
   I think we should hold off on using Junit 5 for now; these changes are fine 
as-are.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-06 Thread GitBox


C0urante commented on code in PR #12320:
URL: https://github.com/apache/kafka/pull/12320#discussion_r915347260


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java:
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.crypto.SecretKey;
+import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Enclosed.class)
+public class RestClientTest {
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+private static final TypeReference TEST_TYPE = new 
TypeReference() {
+};
+private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey();
+
+private static void assertIsInternalServerError(ConnectRestException e) {
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.statusCode());
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.errorCode());
+}
+
+private static SecretKey getMockSecretKey() {
+SecretKey mockKey = mock(SecretKey.class);
+when(mockKey.getFormat()).thenReturn("RAW");// supported format by
+
when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8));
+return mockKey;
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient, String requestSignatureAlgorithm) {
+return RestClient.httpRequest(
+httpClient,
+"https://localhost:1234/api/endpoint;,
+"GET",
+null,
+new TestDTO("requestBodyData"),
+TEST_TYPE,
+MOCK_SECRET_KEY,
+requestSignatureAlgorithm);
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient) {
+String validRequestSignatureAlgorithm = "HmacSHA1";
+return httpRequest(httpClient, validRequestSignatureAlgorithm);
+}
+
+
+@RunWith(Parameterized.class)
+public static class RequestFailureParameterizedTest {
+private final HttpClient httpClient = mock(HttpClient.class);
+
+@Parameterized.Parameter
+public Throwable requestException;
+
+@Parameterized.Parameters
+public static Collection requestExceptions() {
+return Arrays.asList(new Object[][]{
+{new InterruptedException()},
+{new ExecutionException(null)},
+{new TimeoutException()}
+});
+}
+
+private static Request buildThrowingMockRequest(Throwable t) throws 
ExecutionException, InterruptedException, TimeoutException {
+Request req = mock(Request.class);
+when(req.send()).thenThrow(t);
+return req;
+}
+
+@Test
+public void testFailureDuringRequestCausesInternalServerError() 

[GitHub] [kafka] guozhangwang merged pull request #12337: KAFKA-10199: Remove main consumer from store changelog reader

2022-07-06 Thread GitBox


guozhangwang merged PR #12337:
URL: https://github.com/apache/kafka/pull/12337


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] stan-confluent commented on a diff in pull request #12120: Add mini test

2022-07-06 Thread GitBox


stan-confluent commented on code in PR #12120:
URL: https://github.com/apache/kafka/pull/12120#discussion_r915331214


##
tests/kafkatest/tests/core/mini_test.py:
##
@@ -0,0 +1,18 @@
+from ducktape.tests.test import Test
+from ducktape.mark.resource import cluster
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+
+
+class MiniTest(Test):
+def __init__(self, test_context):
+super(MiniTest, self).__init__(test_context=test_context)
+
+self.zk = ZookeeperService(test_context, 1)
+self.kafka = KafkaService(test_context, 1, self.zk)
+
+@cluster(num_nodes=2)
+def test(self):
+self.zk.start()

Review Comment:
   Yeah, sorry, this slipped my radar. It's a good idea, I'll add it - but how 
can we push this PR to older versions if it has kraft? Would it have to be a 
separate PR for older versions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating

2022-07-06 Thread GitBox


hachikuji commented on code in PR #12265:
URL: https://github.com/apache/kafka/pull/12265#discussion_r915323373


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##
@@ -45,26 +45,37 @@ class BrokerMetadataSnapshotter(
*/
   private var _currentSnapshotOffset = -1L
 
+  /**
+   * The offset of the newest snapshot, or -1 if there hasn't been one. 
Accessed only under
+   * the object lock.
+   */
+  private var _latestSnapshotOffset = -1L

Review Comment:
   Do we need to initialize this on startup? If we don't, is it possible that 
we would end up snapshotting the same offset more than once?



##
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##
@@ -45,26 +45,37 @@ class BrokerMetadataSnapshotter(
*/
   private var _currentSnapshotOffset = -1L
 
+  /**
+   * The offset of the newest snapshot, or -1 if there hasn't been one. 
Accessed only under
+   * the object lock.
+   */
+  private var _latestSnapshotOffset = -1L
+
   /**
* The event queue which runs this listener.
*/
   val eventQueue = new KafkaEventQueue(time, logContext, 
threadNamePrefix.getOrElse(""))
 
   override def maybeStartSnapshot(lastContainedLogTime: Long, image: 
MetadataImage): Boolean = synchronized {
-if (_currentSnapshotOffset == -1L) {
+if (_currentSnapshotOffset != -1) {
+  warn(s"Declining to create a new snapshot at 
${image.highestOffsetAndEpoch()} because " +

Review Comment:
   Do you think the log level is right for these messages? It seems like these 
are "normal" things that can happen and not an indication of a problem. Perhaps 
we can make them info or debug level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] methodmissing opened a new pull request, #12385: Expose client information on RequestContext as additional public API beyond request logs (continuation of KIP 511)

2022-07-06 Thread GitBox


methodmissing opened a new pull request, #12385:
URL: https://github.com/apache/kafka/pull/12385

   [KIP 
511](https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers)
 introduced a 
[ClientInformation](https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/network/ClientInformation.java)
 class that wraps software (client) name and version and is also set as a 
property on 
[RequestContext](https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java),
 except unfortunately there's no getter to retrieve this information.
   
   The 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers)
 implemented protocol support, a registry to set it at the network layer per 
session and integrated with 
[RequestContext](https://github.com/apache/kafka/blob/d35283f011a797902fc9c4d896a1a6f039eb7d06/clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java#L101),
 but unfortunately the only "public API" for this information is the broker 
request logs.
   
   This change exposes client information to custom authorisers as well via 
`RequestConext`, where it can be programatically used in a pluggable fashion as 
well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14052) Download verification directions are incorrect for linux

2022-07-06 Thread M Sesterhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M Sesterhenn updated KAFKA-14052:
-
Description: 
[https://www.apache.org/info/verification.html]

The above is linked to from the kafka download page 
([https://kafka.apache.org/downloads]), and it contains incorrect instructions 
for verifying the release.

The .sha512 files for the downloads are all in this format:
{code:java}
kafka_2.13-3.2.0.tgz: 736A1298 23B058DC 10788D08 93BDE47B 6F39B9E4 972F9EAC 
2D5C9E85 E51E4773 44C6F1E1 EBD126CE 34D5FD43 0EB07E55 FDD60D60 CB541F1D 
48655C0E BC0A4778 
{code}
These files cannot be used to easily verify the expected hash using the 
procedure described in the verification website.  The website says to use:
{code:java}
sha512sum file {code}
...which doesn't do any hash comparison; it only tells you what the file's hash 
is, and it is up to the user to manually compare its output with the 
differently formatted output in the .sha512 file, which is error-prone and a 
chore.

Expected result:

I would expect to be able to do 
{code:java}
sha512sum -c file{code}
...like any normal download.

If the format of the .sha512 files cannot be changed to be compatible with the 
linux shasum program, then please update the website to describe the proper way 
to compare hashes.  The best way seems to be a script like this:
{code:java}
SHA=$(mktemp); gpg --print-md SHA512 $FILE > $SHA && diff $SHA $FILE.sha512 && 
echo "SHA checks out OK."
{code}
(where FILE is the downloaded tarball.)

I looked into providing a PR for the verification page, but that is an 
Apache-wide web page and probably is not publicly available.

  was:
[https://www.apache.org/info/verification.html]

The above is linked to from the kafka download page 
([https://kafka.apache.org/downloads]), and it contains incorrect instructions 
for verifying the release.

The .sha512 files for the downloads are all in this format:

 
{code:java}
kafka_2.13-3.2.0.tgz: 736A1298 23B058DC 10788D08 93BDE47B 6F39B9E4 972F9EAC 
2D5C9E85 E51E4773 44C6F1E1 EBD126CE 34D5FD43 0EB07E55 FDD60D60 CB541F1D 
48655C0E BC0A4778 
{code}
These files cannot be used to easily verify the expected hash using the 
procedure described in the verification website.  The website says to use:
{code:java}
sha512sum file {code}
...which doesn't do any hash comparison; it only tells you what the file's hash 
is, and it is up to the user to manually compare its output with the 
differently formatted output in the .sha512 file, which is error-prone and a 
chore.

Expected result:

I would expect to be able to do 
{code:java}
sha512sum -c file{code}
...like any normal download.

 

If the format of the .sha512 files cannot be changed to be compatible with the 
linux shasum program, then please update the website to describe the proper way 
to compare hashes.  The best way seems to be a script like this:

 
{code:java}
SHA=$(mktemp); gpg --print-md SHA512 $FILE > $SHA && diff $SHA $FILE.sha512 && 
echo "SHA checks out OK."
{code}
(where FILE is the downloaded tarball.)

I looked into providing a PR for the verification page, but that is an 
Apache-wide web page and probably is not publicly available.

 

 


> Download verification directions are incorrect for linux
> 
>
> Key: KAFKA-14052
> URL: https://issues.apache.org/jira/browse/KAFKA-14052
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
> Environment: website
>Reporter: M Sesterhenn
>Priority: Major
>
> [https://www.apache.org/info/verification.html]
> The above is linked to from the kafka download page 
> ([https://kafka.apache.org/downloads]), and it contains incorrect 
> instructions for verifying the release.
> The .sha512 files for the downloads are all in this format:
> {code:java}
> kafka_2.13-3.2.0.tgz: 736A1298 23B058DC 10788D08 93BDE47B 6F39B9E4 972F9EAC 
> 2D5C9E85 E51E4773 44C6F1E1 EBD126CE 34D5FD43 0EB07E55 FDD60D60 CB541F1D 
> 48655C0E BC0A4778 
> {code}
> These files cannot be used to easily verify the expected hash using the 
> procedure described in the verification website.  The website says to use:
> {code:java}
> sha512sum file {code}
> ...which doesn't do any hash comparison; it only tells you what the file's 
> hash is, and it is up to the user to manually compare its output with the 
> differently formatted output in the .sha512 file, which is error-prone and a 
> chore.
> Expected result:
> I would expect to be able to do 
> {code:java}
> sha512sum -c file{code}
> ...like any normal download.
> If the format of the .sha512 files cannot be changed to be compatible with 
> the linux shasum program, then please update the website to describe the 
> proper way to compare hashes.  The best way seems to be a script like this:
> {code:java}
> SHA=$(mktemp); 

[jira] [Created] (KAFKA-14052) Download verification directions are incorrect for linux

2022-07-06 Thread M Sesterhenn (Jira)
M Sesterhenn created KAFKA-14052:


 Summary: Download verification directions are incorrect for linux
 Key: KAFKA-14052
 URL: https://issues.apache.org/jira/browse/KAFKA-14052
 Project: Kafka
  Issue Type: Bug
  Components: documentation
 Environment: website
Reporter: M Sesterhenn


[https://www.apache.org/info/verification.html]

The above is linked to from the kafka download page 
([https://kafka.apache.org/downloads]), and it contains incorrect instructions 
for verifying the release.

The .sha512 files for the downloads are all in this format:

 
{code:java}
kafka_2.13-3.2.0.tgz: 736A1298 23B058DC 10788D08 93BDE47B 6F39B9E4 972F9EAC 
2D5C9E85 E51E4773 44C6F1E1 EBD126CE 34D5FD43 0EB07E55 FDD60D60 CB541F1D 
48655C0E BC0A4778 
{code}
These files cannot be used to easily verify the expected hash using the 
procedure described in the verification website.  The website says to use:
{code:java}
sha512sum file {code}
...which doesn't do any hash comparison; it only tells you what the file's hash 
is, and it is up to the user to manually compare its output with the 
differently formatted output in the .sha512 file, which is error-prone and a 
chore.

Expected result:

I would expect to be able to do 
{code:java}
sha512sum -c file{code}
...like any normal download.

 

If the format of the .sha512 files cannot be changed to be compatible with the 
linux shasum program, then please update the website to describe the proper way 
to compare hashes.  The best way seems to be a script like this:

 
{code:java}
SHA=$(mktemp); gpg --print-md SHA512 $FILE > $SHA && diff $SHA $FILE.sha512 && 
echo "SHA checks out OK."
{code}
(where FILE is the downloaded tarball.)

I looked into providing a PR for the verification page, but that is an 
Apache-wide web page and probably is not publicly available.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14051) KRaft remote controllers do not create metrics reporters

2022-07-06 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino reassigned KAFKA-14051:
-

Assignee: Ron Dagostino

> KRaft remote controllers do not create metrics reporters
> 
>
> Key: KAFKA-14051
> URL: https://issues.apache.org/jira/browse/KAFKA-14051
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.3
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
>
> KRaft remote controllers (KRaft nodes with the configuration value 
> process.roles=controller) do not create the configured metrics reporters 
> defined by the configuration key metric.reporters.  The reason is because 
> KRaft remote controllers are not wired up for dynamic config changes, and the 
> creation of the configured metric reporters actually happens during the 
> wiring up of the broker for dynamic reconfiguration, in the invocation of 
> DynamicBrokerConfig.addReconfigurables(KafkaBroker).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14051) KRaft remote controllers do not create metrics reporters

2022-07-06 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-14051:
-

 Summary: KRaft remote controllers do not create metrics reporters
 Key: KAFKA-14051
 URL: https://issues.apache.org/jira/browse/KAFKA-14051
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.3
Reporter: Ron Dagostino


KRaft remote controllers (KRaft nodes with the configuration value 
process.roles=controller) do not create the configured metrics reporters 
defined by the configuration key metric.reporters.  The reason is because KRaft 
remote controllers are not wired up for dynamic config changes, and the 
creation of the configured metric reporters actually happens during the wiring 
up of the broker for dynamic reconfiguration, in the invocation of 
DynamicBrokerConfig.addReconfigurables(KafkaBroker).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14032) Dequeue time for forwarded requests is ignored to set

2022-07-06 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-14032.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

> Dequeue time for forwarded requests is ignored to set
> -
>
> Key: KAFKA-14032
> URL: https://issues.apache.org/jira/browse/KAFKA-14032
> Project: Kafka
>  Issue Type: Bug
>Reporter: Feiyan Yu
>Priority: Minor
> Fix For: 3.3.0
>
>
> It seems like `requestDequeueTimeNanos` is ignored to set.
> As a property of a `Request object`, `requestDequeueTimeNanos` is set only 
> when handlers manage to poll and handle this request from `requestQueue`, 
> however, handlers only poll the request from envelop request once, but calls 
> handle method twice, which lead to an ignorance of `requestDequeueTimeNanos` 
> for parsed forwarded requests.
> The parsed envelop requests have `requestDequeueTimeNanos` = -1, and it 
> affect the correctness of statistics and metrics of `LocalTimeMs`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji merged pull request #12360: KAFKA-14032: Dequeue time for forwarded requests is ignored to set

2022-07-06 Thread GitBox


hachikuji merged PR #12360:
URL: https://github.com/apache/kafka/pull/12360


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch

2022-07-06 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563415#comment-17563415
 ] 

Jason Gustafson commented on KAFKA-14050:
-

This might be a false alarm. We may have been running a client with a version 
in between releases. In 2.7.0, the field has the right int64 type: 
[https://github.com/apache/kafka/blob/2.7.0/clients/src/main/resources/common/message/ApiVersionsResponse.json.]
 In 2.6, the field does not exist: 
[https://github.com/apache/kafka/blob/2.6/clients/src/main/resources/common/message/ApiVersionsResponse.json.]
 

> Older clients cannot deserialize ApiVersions response with finalized feature 
> epoch
> --
>
> Key: KAFKA-14050
> URL: https://issues.apache.org/jira/browse/KAFKA-14050
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.3.0
>
>
> When testing kraft locally, we encountered this exception from an older 
> client:
> {code:java}
> [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | 
> adminclient-1394] org.apache.kafka.common.utils.KafkaThread 
> lambda$configureThread$0 - Uncaught exception in thread 
> 'kafka-admin-client-thread | adminclient-1394':
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'api_keys': Error reading array of size 1207959552, only 579 bytes available
> at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
> at 
> org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
> at 
> org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
> at 
> org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
> at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
> at java.base/java.lang.Thread.run(Thread.java:832) {code}
> The cause appears to be from a change to the type of the 
> `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
> int64: 
> [https://github.com/apache/kafka/pull/9001/files#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bR58.]
> Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this 
> by creating a new field. We will have to leave the existing tag in the 
> protocol spec and consider it dead.
> Credit for this find goes to [~dajac] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch

2022-07-06 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14050:

Description: 
When testing kraft locally, we encountered this exception from an older client:
{code:java}
[ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] 
org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught 
exception in thread 'kafka-admin-client-thread | adminclient-1394':
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'api_keys': Error reading array of size 1207959552, only 579 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
at 
org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
at 
org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
at 
org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
at java.base/java.lang.Thread.run(Thread.java:832) {code}
The cause appears to be from a change to the type of the 
`FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
int64: 
[https://github.com/apache/kafka/pull/9001/files#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bR58.]
 Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by 
creating a new field. We will have to leave the existing tag in the protocol 
spec and consider it dead.

Credit for this find goes to [~dajac] .

  was:
When testing kraft locally, we encountered this exception from an older client:
{code:java}
[ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] 
org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught 
exception in thread 'kafka-admin-client-thread | adminclient-1394':
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'api_keys': Error reading array of size 1207959552, only 579 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
at 
org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
at 
org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
at 
org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
at java.base/java.lang.Thread.run(Thread.java:832) {code}
The cause appears to be from a change to the type of the 
`FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
int64: Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix 
this by creating a new field. We will have to leave the existing tag in the 
protocol spec and consider it dead.

Credit for this find goes to [~dajac] .


> Older clients cannot deserialize ApiVersions response with finalized feature 
> epoch
> --
>
> Key: KAFKA-14050
> URL: https://issues.apache.org/jira/browse/KAFKA-14050
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.3.0
>
>
> When testing kraft locally, we encountered this exception from an older 
> client:
> {code:java}
> [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | 
> adminclient-1394] org.apache.kafka.common.utils.KafkaThread 
> lambda$configureThread$0 - Uncaught exception in thread 
> 'kafka-admin-client-thread | adminclient-1394':
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'api_keys': Error reading array of size 1207959552, only 579 bytes available
> at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
> at 
> 

[jira] [Updated] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch

2022-07-06 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14050:

Description: 
When testing kraft locally, we encountered this exception from an older client:
{code:java}
[ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] 
org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught 
exception in thread 'kafka-admin-client-thread | adminclient-1394':
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'api_keys': Error reading array of size 1207959552, only 579 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
at 
org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
at 
org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
at 
org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
at java.base/java.lang.Thread.run(Thread.java:832) {code}
The cause appears to be from a change to the type of the 
`FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
int64: 
[https://github.com/apache/kafka/pull/9001/files#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bR58.]

Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by 
creating a new field. We will have to leave the existing tag in the protocol 
spec and consider it dead.

Credit for this find goes to [~dajac] .

  was:
When testing kraft locally, we encountered this exception from an older client:
{code:java}
[ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] 
org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught 
exception in thread 'kafka-admin-client-thread | adminclient-1394':
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'api_keys': Error reading array of size 1207959552, only 579 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
at 
org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
at 
org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
at 
org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
at java.base/java.lang.Thread.run(Thread.java:832) {code}
The cause appears to be from a change to the type of the 
`FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
int64: 
[https://github.com/apache/kafka/pull/9001/files#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bR58.]
 Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by 
creating a new field. We will have to leave the existing tag in the protocol 
spec and consider it dead.

Credit for this find goes to [~dajac] .


> Older clients cannot deserialize ApiVersions response with finalized feature 
> epoch
> --
>
> Key: KAFKA-14050
> URL: https://issues.apache.org/jira/browse/KAFKA-14050
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.3.0
>
>
> When testing kraft locally, we encountered this exception from an older 
> client:
> {code:java}
> [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | 
> adminclient-1394] org.apache.kafka.common.utils.KafkaThread 
> lambda$configureThread$0 - Uncaught exception in thread 
> 'kafka-admin-client-thread | adminclient-1394':
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'api_keys': Error reading array of size 

[jira] [Updated] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch

2022-07-06 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14050:

Description: 
When testing kraft locally, we encountered this exception from an older client:
{code:java}
[ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] 
org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught 
exception in thread 'kafka-admin-client-thread | adminclient-1394':
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'api_keys': Error reading array of size 1207959552, only 579 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
at 
org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
at 
org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
at 
org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
at java.base/java.lang.Thread.run(Thread.java:832) {code}
The cause appears to be from a change to the type of the 
`FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
int64: Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix 
this by creating a new field. We will have to leave the existing tag in the 
protocol spec and consider it dead.

Credit for this find goes to [~dajac] .

  was:
When testing kraft locally, we encountered this exception from an older client:
{code:java}
[ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] 
org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught 
exception in thread 'kafka-admin-client-thread | adminclient-1394':
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'api_keys': Error reading array of size 1207959552, only 579 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
at 
org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
at 
org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
at 
org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
at java.base/java.lang.Thread.run(Thread.java:832) {code}
The cause appears to be from a change to the type of the 
`FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
int64: 
[https://github.com/confluentinc/ce-kafka/commit/fb4f297207ef62f71e4a6d2d0dac75752933043d#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bL58.]

Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by 
creating a new field. We will have to leave the existing tag in the protocol 
spec and consider it dead.

Credit for this find goes to [~dajac] .


> Older clients cannot deserialize ApiVersions response with finalized feature 
> epoch
> --
>
> Key: KAFKA-14050
> URL: https://issues.apache.org/jira/browse/KAFKA-14050
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.3.0
>
>
> When testing kraft locally, we encountered this exception from an older 
> client:
> {code:java}
> [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | 
> adminclient-1394] org.apache.kafka.common.utils.KafkaThread 
> lambda$configureThread$0 - Uncaught exception in thread 
> 'kafka-admin-client-thread | adminclient-1394':
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'api_keys': Error reading array of size 1207959552, only 579 bytes available
> at 

[jira] [Updated] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch

2022-07-06 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14050:

Description: 
When testing kraft locally, we encountered this exception from an older client:
{code:java}
[ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] 
org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught 
exception in thread 'kafka-admin-client-thread | adminclient-1394':
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'api_keys': Error reading array of size 1207959552, only 579 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
at 
org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
at 
org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
at 
org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
at java.base/java.lang.Thread.run(Thread.java:832) {code}
The cause appears to be from a change to the type of the 
`FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
int64: 
[https://github.com/confluentinc/ce-kafka/commit/fb4f297207ef62f71e4a6d2d0dac75752933043d#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bL58.]

Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by 
creating a new field. We will have to leave the existing tag in the protocol 
spec and consider it dead.

Credit for this find goes to [~dajac] .

  was:
When testing kraft locally, we encountered this exception from an older client:
{code:java}
[ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] 
org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught 
exception in thread 'kafka-admin-client-thread | admi
nclient-1394':
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'api_keys': Error reading array of size 1207959552, only 579 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
at 
org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
at 
org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
at 
org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
at java.base/java.lang.Thread.run(Thread.java:832) {code}
The cause appears to be from a change to the type of the 
`FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
int64: 
[https://github.com/confluentinc/ce-kafka/commit/fb4f297207ef62f71e4a6d2d0dac75752933043d#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bL58.]

Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by 
creating a new field. We will have to leave the existing tag in the protocol 
spec and consider it dead.

Credit for this find goes to [~dajac] .


> Older clients cannot deserialize ApiVersions response with finalized feature 
> epoch
> --
>
> Key: KAFKA-14050
> URL: https://issues.apache.org/jira/browse/KAFKA-14050
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.3.0
>
>
> When testing kraft locally, we encountered this exception from an older 
> client:
> {code:java}
> [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | 
> adminclient-1394] org.apache.kafka.common.utils.KafkaThread 
> lambda$configureThread$0 - Uncaught exception in thread 
> 'kafka-admin-client-thread | adminclient-1394':
> 

[jira] [Created] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch

2022-07-06 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14050:
---

 Summary: Older clients cannot deserialize ApiVersions response 
with finalized feature epoch
 Key: KAFKA-14050
 URL: https://issues.apache.org/jira/browse/KAFKA-14050
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
 Fix For: 3.3.0


When testing kraft locally, we encountered this exception from an older client:
{code:java}
[ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | adminclient-1394] 
org.apache.kafka.common.utils.KafkaThread lambda$configureThread$0 - Uncaught 
exception in thread 'kafka-admin-client-thread | admi
nclient-1394':
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'api_keys': Error reading array of size 1207959552, only 579 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
at 
org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
at 
org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
at 
org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
at 
org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
at java.base/java.lang.Thread.run(Thread.java:832) {code}
The cause appears to be from a change to the type of the 
`FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
int64: 
[https://github.com/confluentinc/ce-kafka/commit/fb4f297207ef62f71e4a6d2d0dac75752933043d#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bL58.]

Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this by 
creating a new field. We will have to leave the existing tag in the protocol 
spec and consider it dead.

Credit for this find goes to [~dajac] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log

2022-07-06 Thread GitBox


jsancio commented on PR #12274:
URL: https://github.com/apache/kafka/pull/12274#issuecomment-1176530495

   > In this PR I tried your suggestion and it does solve this problem, 
however, this will make the logic in RaftClient very complex and we need to 
save more states in LeaderState and it's also difficult to test
   
   @dengziming Do you have a diff for this solution? I am interested in this 
solution as it would work in both REMOTE and COLOCATED configuration for KRaft.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log

2022-07-06 Thread GitBox


jsancio commented on PR #12274:
URL: https://github.com/apache/kafka/pull/12274#issuecomment-1176512896

   > > here is the code change: 
https://github.com/dengziming/kafka/tree/KAFKA-13959-2
   > 
   > Hey @dengziming, I took at look at the commits in this tree. Is this the 
only commit 
[dengziming@79dc8ec](https://github.com/dengziming/kafka/commit/79dc8ec423cd74fba462e934f89bdec3dcd8528d)?
 Can you maybe share a diff/compare. For example, something like 
[dengziming/kafka@30216ea...KAFKA-13959-2](https://github.com/dengziming/kafka/compare/30216ea1c58761e62f51af40033f24e3ae1c5c2a...KAFKA-13959-2)
   
   Never mind. I understand now. The broker sends the active controller the 
local LEO instead of the last applied offset by the broker listener. I think 
this will unfence the broker at startup even if the broker hasn't applied the 
snapshot or any of the log records, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log

2022-07-06 Thread GitBox


jsancio commented on PR #12274:
URL: https://github.com/apache/kafka/pull/12274#issuecomment-1176510158

   > here is the code change: 
https://github.com/dengziming/kafka/tree/KAFKA-13959-2
   
   Hey @dengziming, I took at look at the commits in this tree. Is this the 
only commit 
https://github.com/dengziming/kafka/commit/79dc8ec423cd74fba462e934f89bdec3dcd8528d?
 Can you maybe share a diff/compare. For example, something like 
https://github.com/dengziming/kafka/compare/30216ea1c58761e62f51af40033f24e3ae1c5c2a...KAFKA-13959-2
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

2022-07-06 Thread GitBox


lihaosky commented on PR #12166:
URL: https://github.com/apache/kafka/pull/12166#issuecomment-1176495887

   I can also take a look by end of this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12383: REVERT: Kip-770

2022-07-06 Thread GitBox


mjsax commented on code in PR #12383:
URL: https://github.com/apache/kafka/pull/12383#discussion_r915090310


##
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##
@@ -1256,44 +1255,6 @@ public void 
shouldThrowExceptionWhenClientTagValueExceedMaxLimit() {
 );
 }
 
-@Test
-@SuppressWarnings("deprecation")
-public void 
shouldUseStateStoreCacheMaxBytesWhenBothOldAndNewConfigsAreSet() {
-props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 100);
-props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
-final StreamsConfig config = new StreamsConfig(props);
-assertEquals(getTotalCacheSize(config), 100);
-}
-
-@Test
-@SuppressWarnings("deprecation")
-public void 
shouldUseCacheMaxBytesBufferingConfigWhenOnlyDeprecatedConfigIsSet() {
-props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10);
-final StreamsConfig config = new StreamsConfig(props);
-assertEquals(getTotalCacheSize(config), 10);
-}
-
-@Test
-public void shouldUseStateStoreCacheMaxBytesWhenNewConfigIsSet() {
-props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 10);
-final StreamsConfig config = new StreamsConfig(props);
-assertEquals(getTotalCacheSize(config), 10);
-}
-
-@Test
-public void 
shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
-final StreamsConfig config = new StreamsConfig(props);
-assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
-}
-
-@Test
-public void testInvalidSecurityProtocol() {
-props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");
-final ConfigException ce = assertThrows(ConfigException.class,
-() -> new StreamsConfig(props));
-
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
-}
-

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12383: REVERT: Kip-770

2022-07-06 Thread GitBox


mjsax commented on code in PR #12383:
URL: https://github.com/apache/kafka/pull/12383#discussion_r915088428


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java:
##
@@ -55,7 +55,6 @@ public class RecordQueue {
 
 private final Sensor droppedRecordsSensor;
 private final Sensor consumedSensor;
-private long totalBytesBuffered;
 private long headRecordSizeInBytes;

Review Comment:
   I did keep this one, because there was a follow up PR that depends on it, 
and I did not want to revert (or change) the other one.
   
   
https://github.com/apache/kafka/commit/a6c5a74fdbdce9a992b47706913c920902cda28c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] MPeli commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2022-07-06 Thread GitBox


MPeli commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1176373885

   @divijvaidya, thank you for you help. JDK 17 and Scala 2.13 build and tests 
finished successfully. Can we now request can a committer for review on this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12309: KAFKA-14007: Invoking connect headers.close method on shutdown

2022-07-06 Thread GitBox


vamossagar12 commented on PR #12309:
URL: https://github.com/apache/kafka/pull/12309#issuecomment-1176357937

   @showuon , can you plz review the changes? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12321: KAFKA-14012: Adding null checks for cases when closeQuietly was being passed a lambda object

2022-07-06 Thread GitBox


vamossagar12 commented on PR #12321:
URL: https://github.com/apache/kafka/pull/12321#issuecomment-1176357390

   @showuon , could you plz review these changes whenever you get the chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12383: REVERT: Kip-770

2022-07-06 Thread GitBox


vamossagar12 commented on code in PR #12383:
URL: https://github.com/apache/kafka/pull/12383#discussion_r914702189


##
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##
@@ -137,54 +129,19 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
 maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 log.info("Topology {} is overriding {} to {}", topologyName, 
BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize);
 } else {
-// If the user hasn't explicitly set the 
buffered.records.per.partition config, then leave it unbounded
-// and rely on the input.buffer.max.bytes instead to keep the 
memory usage under control
-maxBufferedSize = 
globalAppConfigs.originals().containsKey(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)
-? 
globalAppConfigs.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG) : 
-1;
+maxBufferedSize = 
globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 }
 
-final boolean stateStoreCacheMaxBytesOverridden = 
isTopologyOverride(STATESTORE_CACHE_MAX_BYTES_CONFIG, topologyOverrides);
-final boolean cacheMaxBytesBufferingOverridden = 
isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides);
-
-if (!stateStoreCacheMaxBytesOverridden && 
!cacheMaxBytesBufferingOverridden) {
-cacheSize = getTotalCacheSize(globalAppConfigs);
+if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, 
topologyOverrides)) {
+cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
+log.info("Topology {} is overriding {} to {}", topologyName, 
CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize);
 } else {
-if (stateStoreCacheMaxBytesOverridden && 
cacheMaxBytesBufferingOverridden) {
-cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
-log.info("Topology {} is using both deprecated config {} and 
new config {}, hence {} is ignored and the new config {} (value {}) is used",
- topologyName,
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- STATESTORE_CACHE_MAX_BYTES_CONFIG,
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- STATESTORE_CACHE_MAX_BYTES_CONFIG,
- cacheSize);
-} else if (cacheMaxBytesBufferingOverridden) {
-cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
-log.info("Topology {} is using only deprecated config {}, and 
will be used to set cache size to {}; " +
- "we suggest setting the new config {} instead as 
deprecated {} would be removed in the future.",
- topologyName,
- CACHE_MAX_BYTES_BUFFERING_CONFIG,
- cacheSize,
- STATESTORE_CACHE_MAX_BYTES_CONFIG,
- CACHE_MAX_BYTES_BUFFERING_CONFIG);
-} else {
-cacheSize = getLong(STATESTORE_CACHE_MAX_BYTES_CONFIG);
-}
-
-if (cacheSize != 0) {
-log.warn("Topology {} is overriding cache size to {} but this 
will not have any effect as the "
- + "topology-level cache size config only controls 
whether record buffering is enabled "
- + "or disabled, thus the only valid override 
value is 0",
- topologyName, cacheSize);
-} else {
-log.info("Topology {} is overriding cache size to {}, record 
buffering will be disabled",
- topologyName, cacheSize);
-}
+cacheSize = 
globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG);
 }
 
 if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) {
 maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG);
-log.info("Topology {} is overriding {} to {}", topologyName, 
MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);
+log.info("Topology {} is overridding {} to {}", topologyName, 
MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs);

Review Comment:
   nit: typo in overriding.. I think that's how it was originally. Comment can 
be ignored :) 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java:
##
@@ -55,7 +55,6 @@ public class RecordQueue {
 
 private final Sensor droppedRecordsSensor;
 private final Sensor consumedSensor;
-private long totalBytesBuffered;
 private long headRecordSizeInBytes;

Review Comment:
   I think headRecordSizeInBytes was also added as part of the PR which should 
be removed:
   
   
https://github.com/apache/kafka/pull/11796/files#diff-2c19d764cad8fcbe7da8046cf0a01e525bc41a5e12e08e8c71d76c0f27ffc550R56



##

[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-06 Thread GitBox


elkkhan commented on code in PR #12320:
URL: https://github.com/apache/kafka/pull/12320#discussion_r914831552


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java:
##
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.jose4j.keys.HmacKey;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Enclosed.class)
+public class RestClientTest {
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+private static final TypeReference TEST_TYPE = new 
TypeReference() {
+};
+
+private static void assertIsInternalServerError(ConnectRestException e) {
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.statusCode());
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.errorCode());
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient, String requestSignatureAlgorithm) {
+return RestClient.httpRequest(
+httpClient,
+"https://localhost:1234/api/endpoint;,
+"GET",
+null,
+new TestDTO("requestBodyData"),
+TEST_TYPE,
+new HmacKey("HMAC".getBytes(StandardCharsets.UTF_8)),
+requestSignatureAlgorithm);
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient) {
+String validRequestSignatureAlgorithm = "HmacMD5";
+return httpRequest(httpClient, validRequestSignatureAlgorithm);
+}
+
+
+@RunWith(Parameterized.class)
+public static class RequestFailureParameterizedTest {
+private static final HttpClient httpClient = mock(HttpClient.class);

Review Comment:
   `@Mock` can't be used since it needs a `MockitoJUnitRunner` - we already 
have the `Parameterized` runner here and we can't have multiple runners on a 
class. Good point about the `static` mock, I missed that.
   
   `Any static field saves its state for the duration of the JVM's execution 
(unless code changes its value, of course). JUnit uses one JVM for all of its 
tests, so, yes, static fields save state between tests.`
   
   Changing the field to non-static
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #12380: MINOR: Get rid of agent checks in Jenkinsfile

2022-07-06 Thread GitBox


mumrah commented on PR #12380:
URL: https://github.com/apache/kafka/pull/12380#issuecomment-1176265122

   We can probably do without these agent checks. The idea behind them was to 
fail-fast when the node provider wasn't available (rather than waiting for the 
2hr timeout). But, the end result is pretty much the same (aborted build 
status).
   
   I wonder if we can set up a different Jenkins job that runs on trunk only 
for ARM and PPC. I doubt we have gained much from running these builds on each 
PR. Seems they mainly add noise to our build status.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] anekee666 commented on pull request #6329: KAFKA-1194: Fix renaming open files on Windows

2022-07-06 Thread GitBox


anekee666 commented on PR #6329:
URL: https://github.com/apache/kafka/pull/6329#issuecomment-1176258274

   Hi,
   
   it seems a leak in the kafka log segments area, after the changes:
   
![image-2022-06-28-14-41-43-339](https://user-images.githubusercontent.com/98586737/177567524-c4fecb96-d84b-4683-bac3-347a44da5523.png)
   
   The memory increase over time, but i still did find the reason any idea ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] elkkhan commented on pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-06 Thread GitBox


elkkhan commented on PR #12320:
URL: https://github.com/apache/kafka/pull/12320#issuecomment-1176251858

   @C0urante addressed the comments, I think the only outstanding one is 
whether we should add junit-jupiter-params to dep. list and use that for 
parameterised tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-06 Thread GitBox


elkkhan commented on code in PR #12320:
URL: https://github.com/apache/kafka/pull/12320#discussion_r914862459


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java:
##
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.jose4j.keys.HmacKey;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Enclosed.class)
+public class RestClientTest {
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+private static final TypeReference TEST_TYPE = new 
TypeReference() {
+};
+
+private static void assertIsInternalServerError(ConnectRestException e) {
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.statusCode());
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.errorCode());
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient, String requestSignatureAlgorithm) {
+return RestClient.httpRequest(
+httpClient,
+"https://localhost:1234/api/endpoint;,
+"GET",
+null,
+new TestDTO("requestBodyData"),
+TEST_TYPE,
+new HmacKey("HMAC".getBytes(StandardCharsets.UTF_8)),

Review Comment:
   replaced with a mock key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-06 Thread GitBox


elkkhan commented on code in PR #12320:
URL: https://github.com/apache/kafka/pull/12320#discussion_r914831552


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java:
##
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.jose4j.keys.HmacKey;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Enclosed.class)
+public class RestClientTest {
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+private static final TypeReference TEST_TYPE = new 
TypeReference() {
+};
+
+private static void assertIsInternalServerError(ConnectRestException e) {
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.statusCode());
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.errorCode());
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient, String requestSignatureAlgorithm) {
+return RestClient.httpRequest(
+httpClient,
+"https://localhost:1234/api/endpoint;,
+"GET",
+null,
+new TestDTO("requestBodyData"),
+TEST_TYPE,
+new HmacKey("HMAC".getBytes(StandardCharsets.UTF_8)),
+requestSignatureAlgorithm);
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient) {
+String validRequestSignatureAlgorithm = "HmacMD5";
+return httpRequest(httpClient, validRequestSignatureAlgorithm);
+}
+
+
+@RunWith(Parameterized.class)
+public static class RequestFailureParameterizedTest {
+private static final HttpClient httpClient = mock(HttpClient.class);

Review Comment:
   `@Mock` can't be used since it needs a `MockitoJUnitRunner`, but we already 
have the `Parameterized` runner here and we can't have multiple runners on a 
class. Good point about the `static` mock, I missed that.
   
   `Any static field saves its state for the duration of the JVM's execution 
(unless code changes its value, of course). JUnit uses one JVM for all of its 
tests, so, yes, static fields save state between tests.`
   
   Changing the field to non-static
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-06 Thread GitBox


elkkhan commented on code in PR #12320:
URL: https://github.com/apache/kafka/pull/12320#discussion_r914821168


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.ws.rs.core.Response;
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.niceMock;
+import static org.easymock.EasyMock.replay;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RestClientTest {
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+private static final TypeReference TEST_TYPE = new 
TypeReference() {
+};
+private HttpClient httpClient;
+
+private static String toJsonString(Object obj) {
+try {
+return OBJECT_MAPPER.writeValueAsString(obj);
+} catch (JsonProcessingException e) {
+throw new RuntimeException(e);
+}
+}
+
+private static  RestClient.HttpResponse httpRequest(HttpClient 
httpClient, TypeReference typeReference) {
+return RestClient.httpRequest(
+httpClient, null, null, null, null, typeReference, null, null);
+}
+
+@BeforeEach
+public void mockSetup() {
+httpClient = niceMock(HttpClient.class);
+}
+
+@Test
+public void testSuccess() throws ExecutionException, InterruptedException, 
TimeoutException {
+int statusCode = Response.Status.OK.getStatusCode();
+String expectedResponse = toJsonString(new TestDTO("someContent"));
+setupHttpClient(statusCode, expectedResponse);
+
+RestClient.HttpResponse httpResp = httpRequest(httpClient, 
TEST_TYPE);
+assertEquals(httpResp.status(), statusCode);
+assertEquals(toJsonString(httpResp.body()), expectedResponse);
+}
+
+@Test
+public void testNoContent() throws ExecutionException, 
InterruptedException, TimeoutException {
+int statusCode = Response.Status.NO_CONTENT.getStatusCode();
+setupHttpClient(statusCode, null);
+
+RestClient.HttpResponse httpResp = httpRequest(httpClient, 
TEST_TYPE);
+assertEquals(httpResp.status(), statusCode);
+assertNull(httpResp.body());
+}
+
+@Test
+public void testError() throws ExecutionException, InterruptedException, 
TimeoutException {

Review Comment:
   @C0urante would it be too much overhead to add junit-jupiter-params to 
classpath? if not, I'd be happy to refactor this to use JUnit 5 parameterized 
tests, otherwise I think it can be left as is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12381: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection

2022-07-06 Thread GitBox


divijvaidya commented on code in PR #12381:
URL: https://github.com/apache/kafka/pull/12381#discussion_r914730050


##
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala:
##
@@ -429,6 +430,20 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
   verifyProduceConsume(producer, consumer, 10, topic)
 }
 
+def verifyBrokerToControllerCall(controller: KafkaServer): Unit = {
+  val nonControllerBroker = servers.find(_.config.brokerId != 
controller.config.brokerId).get
+  val brokerToControllerManager = 
nonControllerBroker.clientToControllerChannelManager
+  val completionHandler = new TestControllerRequestCompletionHandler()
+  brokerToControllerManager.sendRequest(new MetadataRequest.Builder(new 
MetadataRequestData()), completionHandler)
+  TestUtils.waitUntilTrue(() => {
+completionHandler.completed.get() || completionHandler.timedOut.get()
+  }, "Timed out while waiting for broker to controller API call")
+  val response = completionHandler.actualResponse.getOrElse(throw new 
IllegalStateException("No response recorded even though request is completed"))

Review Comment:
   Both your comments make sense. I have make the changes as per your 
suggestions.



##
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##
@@ -189,6 +188,10 @@ class BrokerToControllerChannelManagerImpl(
 config.saslInterBrokerHandshakeRequestEnable,
 logContext
   )
+  channelBuilder match {
+case reconfigurable: Reconfigurable => 
config.addReconfigurable(reconfigurable)
+case _ =>

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-07-06 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563162#comment-17563162
 ] 

Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 11:27 AM:
-

So I think I have gotten somewhere. I tried adjusting various timeouts to no 
avail so I moved onto another theory which is that some topology/stream based 
methods are asynchronous, i.e they are doing something in the background even 
after the calling function has finished. 

To test this theory I have adjusted the flaky test by adding some manual 
Thread.sleep's to account for this supposed synchronicity, i.e.
{code:java}
final AddNamedTopologyResult result1 = 
streams.addNamedTopology(topology2Client1);
Thread.sleep(500);
streams2.addNamedTopology(topology2Client2).all().get();
Thread.sleep(500);
result1.all().get();
Thread.sleep(500);{code}
This appears to be greatly reducing the flakiness. Originally I used 200 millis 
which roughly increased the amount of time until failure by 4x however I still 
got a failure so now I am testing it with the 500 millis as shown above.


was (Author: mdedetrich-aiven):
So I think I have gotten somewhere. I tried adjusting various timeouts to no 
avail so I moved onto another theory which is that some topology/stream based 
methods are asynchronous, i.e they are doing something in the background even 
after the calling function has finished. 

To test this theory I have adjusted the flaky test by adding some manual 
sleeps, i.e.
{code:java}
final AddNamedTopologyResult result1 = 
streams.addNamedTopology(topology2Client1);
Thread.sleep(500);
streams2.addNamedTopology(topology2Client2).all().get();
Thread.sleep(500);
result1.all().get();
Thread.sleep(500);{code}
This appears to be greatly reducing the flakiness. Originally I used 200 millis 
which roughly increased the amount of time until failure by 4x however I still 
got a failure so now I am testing it with the 500 millis as shown above.

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Critical
>  Labels: flaky-test
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/
> 

[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-07-06 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563004#comment-17563004
 ] 

Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 11:24 AM:
-

[~cadonna] So I did some debugging on this ticket over the past week and I 
found out some interesting things.

To start off with I did manage to predictably replicate the test's flakiness 
and it does appear to be related to load, i.e. the test is more flaky the less 
CPU resources it has. I am using docker (i.e. running the tests within docker 
gradle image) by using the --cpu flag to limit resources.

Interestingly I have gone up to 5 cpu's and its still flaking out albeit less 
often. I attempted to increase the various timeouts that is used in t he test 
but this had no effect so I am going to dig a bit further.

Note that 5 cpu's is already considered "high" (at least for a machine that 
would reasonably run kafka streams). The machine I am running has 12 "cpus"'s 
(6 cores, 12 threads) and at least when running with all of the resources on 
the machine I couldn't replicate the flaky test.


was (Author: mdedetrich-aiven):
[~cadonna] So I did some debugging on this ticket over the past week and I 
found out some interesting things.

To start off with I did manage to predictably replicate the test's flakiness 
and it does appear to be related to load, i.e. the test is more flaky the less 
CPU resources it has. I am using docker (i.e. running the tests within docker 
gradle image) by using the --cpu flag to limit resources.

Interestingly I have gone up to 5 cpu's and its still flaking out albeit less 
often. I attempted to increase the various timeouts that is used in t he test 
but this had no effect so I am going to dig a bit further.

Note that 5 cpu's is already considered "high" (at least for a machine that 
would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 
cores, 12 threads) and at least when running with all of the resources on the 
machine I couldn't replicate the flaky test.

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Critical
>  Labels: flaky-test
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/
> 

[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-07-06 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563162#comment-17563162
 ] 

Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 11:23 AM:
-

So I think I have gotten somewhere. I tried adjusting various timeouts to no 
avail so I moved onto another theory which is that some topology/stream based 
methods are asynchronous, i.e they are doing something in the background even 
after the calling function has finished. 

To test this theory I have adjusted the flaky test by adding some manual 
sleeps, i.e.
{code:java}
final AddNamedTopologyResult result1 = 
streams.addNamedTopology(topology2Client1);
Thread.sleep(500);
streams2.addNamedTopology(topology2Client2).all().get();
Thread.sleep(500);
result1.all().get();
Thread.sleep(500);{code}
This appears to be greatly reducing the flakiness. Originally I used 200 millis 
which roughly increased the amount of time until failure by 4x however I still 
got a failure so now I am testing it with the 500 millis as shown above.


was (Author: mdedetrich-aiven):
So I think I have gotten somewhere. I tried adjusting various timeouts to no 
avail so I moved onto another theory which is that some topology/stream based 
methods are asynchronous, i.e they are doing something in the background even 
after the calling function has finished. 

To test this theory I have adjusted the flaky test by adding some manual 
sleeps, i.e.
{code:java}
final AddNamedTopologyResult result1 = 
streams.addNamedTopology(topology2Client1);
Thread.sleep(500);
streams2.addNamedTopology(topology2Client2).all().get();
Thread.sleep(500);
result1.all().get();
Thread.sleep(500);{code}
This appears to be greatly reducing the flakiness. Originally I used 200 millis 
which roughly increased the amount of time until failure by 4x and now I am 
testing it with the 500 millis as shown above.

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Critical
>  Labels: flaky-test
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/



--

[jira] [Commented] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-07-06 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563162#comment-17563162
 ] 

Matthew de Detrich commented on KAFKA-14014:


So I think I have gotten somewhere. I tried adjusting various timeouts to no 
avail so I moved onto another theory which is that some topology/stream based 
methods are asynchronous, i.e they are doing something in the background even 
after the calling function has finished. 

To test this theory I have adjusted the flaky test by adding some manual 
sleeps, i.e.
{code:java}
final AddNamedTopologyResult result1 = 
streams.addNamedTopology(topology2Client1);
Thread.sleep(500);
streams2.addNamedTopology(topology2Client2).all().get();
Thread.sleep(500);
result1.all().get();
Thread.sleep(500);{code}
This appears to be greatly reducing the flakiness. Originally I used 200 millis 
which roughly increased the amount of time until failure by 4x and now I am 
testing it with the 500 millis as shown above.

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Critical
>  Labels: flaky-test
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

2022-07-06 Thread GitBox


tyamashi-oss commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r914719266


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -186,11 +186,18 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-* Reconfigure log clean config. This simply stops current log cleaners and 
creates new ones.
+* Reconfigure log clean config. This updates desiredRatePerSec in 
Throttler with logCleanerIoMaxBytesPerSecond and stops current log cleaners and 
creates new ones.

Review Comment:
   Thank you. I’ve update the comment. 
   
https://github.com/apache/kafka/pull/12296/commits/f9d27f6181c3fbc1ef02d56904d58793962dedb2



##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
 } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000)
+
+val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new 
KafkaConfig(oldKafkaProps)),
+  logDirs = Array(TestUtils.tempDir()),
+  logs = new Pool[TopicPartition, UnifiedLog](),
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  time = time)
+
+assertEquals(logCleaner.throttler.desiredRatePerSec, 1000, 
"Throttler.desiredRatePerSec should be initialized with 
KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")
+
+val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 2000)
+
+logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new 
KafkaConfig(newKafkaProps))

Review Comment:
   Your concern is correct. Thank you.
   LogCleaner.shutdown() should be called at the end of the test because 
kafka-log-cleaner-thread-x threads are created in LogCleaner.startup() at the 
end of LogCleaner.reconfigure(), and the threads continue to remain. 
   I appended LogCleaner.shutdown() to the end of the test and also used 
LogCleaner with empty startup() and shutdown() implementations.
   The test is somewhat more white-box like according to the 
LogCleaner.reconfigure() implementation, but I couldn't think of any other way. 
Please let me know if you have any.
   
https://github.com/apache/kafka/pull/12296/commits/e05707d5613d096df06c4e96085deb481b00a7e7
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

2022-07-06 Thread GitBox


tyamashi-oss commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r914719051


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
 } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000)
+
+val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new 
KafkaConfig(oldKafkaProps)),
+  logDirs = Array(TestUtils.tempDir()),
+  logs = new Pool[TopicPartition, UnifiedLog](),
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  time = time)
+
+assertEquals(logCleaner.throttler.desiredRatePerSec, 1000, 
"Throttler.desiredRatePerSec should be initialized with 
KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")

Review Comment:
   Thank you. I’ve updated the assert method parameters and the error message. 
   
https://github.com/apache/kafka/pull/12296/commits/e05707d5613d096df06c4e96085deb481b00a7e7



##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
 } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000)
+
+val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new 
KafkaConfig(oldKafkaProps)),
+  logDirs = Array(TestUtils.tempDir()),
+  logs = new Pool[TopicPartition, UnifiedLog](),
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  time = time)
+
+assertEquals(logCleaner.throttler.desiredRatePerSec, 1000, 
"Throttler.desiredRatePerSec should be initialized with 
KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")
+
+val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 2000)
+
+logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new 
KafkaConfig(newKafkaProps))
+
+assertEquals(logCleaner.throttler.desiredRatePerSec, 2000, 
"Throttler.desiredRatePerSec should be updated with new 
KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")

Review Comment:
   Thank you. I’ve update the error message. 
   
https://github.com/apache/kafka/pull/12296/commits/e05707d5613d096df06c4e96085deb481b00a7e7



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager

2022-07-06 Thread GitBox


cadonna commented on code in PR #12384:
URL: https://github.com/apache/kafka/pull/12384#discussion_r914710195


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -237,14 +237,6 @@ Task activeTasksForInputPartition(final TopicPartition 
partition) {
 return activeTasksPerPartition.get(partition);
 }
 
-// TODO: change return type to `StandbyTask`
-Task standbyTask(final TaskId taskId) {
-if (!standbyTasksPerId.containsKey(taskId)) {
-throw new IllegalStateException("Standby task unknown: " + taskId);
-}
-return standbyTasksPerId.get(taskId);
-}
-

Review Comment:
   Not used anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager

2022-07-06 Thread GitBox


cadonna commented on code in PR #12384:
URL: https://github.com/apache/kafka/pull/12384#discussion_r914709967


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -283,24 +280,6 @@ Collection notPausedTasks() {
 .collect(Collectors.toList());
 }
 
-Set activeTaskIds() {
-return readOnlyActiveTaskIds;
-}
-
-Set standbyTaskIds() {
-return readOnlyStandbyTaskIds;
-}
-
-// TODO: change return type to `StreamTask`
-Map activeTaskMap() {
-return readOnlyActiveTasksPerId;
-}
-
-// TODO: change return type to `StandbyTask`
-Map standbyTaskMap() {
-return readOnlyStandbyTasksPerId;
-}
-

Review Comment:
   Those methods are not used anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager

2022-07-06 Thread GitBox


cadonna commented on code in PR #12384:
URL: https://github.com/apache/kafka/pull/12384#discussion_r914709611


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -798,12 +798,12 @@ private void closeTaskDirty(final Task task) {
 } catch (final RuntimeException swallow) {
 log.error("Error suspending dirty task {} ", task.id(), swallow);
 }
-tasks.removeTaskBeforeClosing(task.id());
+tasks.removeTask(task.id());

Review Comment:
   Renaming to a more appropriate name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager

2022-07-06 Thread GitBox


cadonna commented on code in PR #12384:
URL: https://github.com/apache/kafka/pull/12384#discussion_r914709320


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##
@@ -82,7 +82,7 @@ int process(final int maxNumRecords, final Time time) {
 }
 } catch (final Throwable t) {
 taskExecutionMetadata.registerTaskError(task, t, now);
-
tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);
+
tasks.removeTaskFromSuccessfullyProcessedBeforeClosing(lastProcessed);

Review Comment:
   Just a typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12379: KAFKA-10199: Remove call to Task#completeRestoration from state updater

2022-07-06 Thread GitBox


cadonna merged PR #12379:
URL: https://github.com/apache/kafka/pull/12379


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12379: KAFKA-10199: Remove call to Task#completeRestoration from state updater

2022-07-06 Thread GitBox


cadonna commented on PR #12379:
URL: https://github.com/apache/kafka/pull/12379#issuecomment-1176062651

   Test failure is unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / testTaskCancellation() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12384: KAFKA-10199: Add methods to add and remove tasks to task manager

2022-07-06 Thread GitBox


cadonna opened a new pull request, #12384:
URL: https://github.com/apache/kafka/pull/12384

   To integrate the state updater into the current code, we need the
   ability to add and remove tasks from the task manager. This
   functionality is needed to ensure that a task is managed either
   by the task manager or by the state updater but not by both.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14049) Relax Non Null Requirement for KStreamGlobalKTable Left Join

2022-07-06 Thread Saumya Gupta (Jira)
Saumya Gupta created KAFKA-14049:


 Summary: Relax Non Null Requirement for KStreamGlobalKTable Left 
Join
 Key: KAFKA-14049
 URL: https://issues.apache.org/jira/browse/KAFKA-14049
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Saumya Gupta


Null Values in the Stream for a Left Join would indicate a Tombstone Message 
that needs to propagated if not actually joined with the GlobalKTable message, 
hence these messages should not be ignored .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] singhnama commented on pull request #12359: KAFKA-13983: Support special character in Resource name in ACLs operation by sanitizing

2022-07-06 Thread GitBox


singhnama commented on PR #12359:
URL: https://github.com/apache/kafka/pull/12359#issuecomment-1176054747

   Sanitizing the resource name have a compatibility issue with already 
existing ACLs,  and also there are very few special characters which are not 
allowed by the zookeeper which can be found here 
[https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html](https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html)
 so we decided to fail the creation with `/` in the resource name since this is 
allowed by zookeeper but it's creating the problem.
   
   Not allowed characters:
   `The null character (\u) cannot be part of a path name. (This causes 
problems with the C binding.)
   
   The following characters can't be used because they don't display well, or 
render in confusing ways: \u0001 - \u0019 and \u007F - \u009F.
   
   The following characters are not allowed: \ud800 -uF8FFF, \uFFF0-u, 
\uXFFFE - \uX (where X is a digit 1 - E), \uF - \uF.
   
   The "." character can be used as part of another name, but "." and ".." 
cannot alone be used to indicate a node along a path, because ZooKeeper doesn't 
use relative paths. The following would be invalid: "/a/b/./c" or "/a/b/../c".
   
   The token "zookeeper" is reserved.`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yufeiyan1220 commented on a diff in pull request #12360: KAFKA-14032: Dequeue time for forwarded requests is ignored to set

2022-07-06 Thread GitBox


yufeiyan1220 commented on code in PR #12360:
URL: https://github.com/apache/kafka/pull/12360#discussion_r914629492


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -307,8 +307,12 @@ class KafkaApisTest {
 Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
 val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
 
+val startTimeNanos = time.nanoseconds()
+val dequeueCostNanos = 5 * 1000 * 1000
 val request = TestUtils.buildEnvelopeRequest(
-  alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
time.nanoseconds())
+  alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, 
startTimeNanos)
+// add dequeue time to simulate request handlers poll request from 
requestQueue
+request.requestDequeueTimeNanos = startTimeNanos + dequeueCostNanos

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-07-06 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563060#comment-17563060
 ] 

Bruno Cadonna commented on KAFKA-14014:
---

[~mdedetrich-aiven] Thank you for the investigation! That is indeed interesting!
Maybe you can restructure the test to make it less flaky.

[~wcarlson5][~ableegoldman] Do you maybe have some hints to make the test less 
flaky given Matthew's findings?

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Critical
>  Labels: flaky-test
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-07-06 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-14014:
--
Labels: flaky-test  (was: )

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Critical
>  Labels: flaky-test
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14048) The Next Generation of the Consumer Rebalance Protocol

2022-07-06 Thread David Jacot (Jira)
David Jacot created KAFKA-14048:
---

 Summary: The Next Generation of the Consumer Rebalance Protocol
 Key: KAFKA-14048
 URL: https://issues.apache.org/jira/browse/KAFKA-14048
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot
Assignee: David Jacot


This Jira tracks the development of KIP-848: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dengziming closed pull request #12157: MINOR: Support co-resident mode in KRaft TestKit

2022-07-06 Thread GitBox


dengziming closed pull request #12157: MINOR: Support co-resident mode in KRaft 
TestKit
URL: https://github.com/apache/kafka/pull/12157


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #12157: MINOR: Support co-resident mode in KRaft TestKit

2022-07-06 Thread GitBox


dengziming commented on PR #12157:
URL: https://github.com/apache/kafka/pull/12157#issuecomment-1175912322

   Close this currently since it has been done in another PR, and I created 
KAFKA-14047 for the discussion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14047) Use KafkaRaftManager in KRaft TestKit

2022-07-06 Thread dengziming (Jira)
dengziming created KAFKA-14047:
--

 Summary: Use KafkaRaftManager in KRaft TestKit
 Key: KAFKA-14047
 URL: https://issues.apache.org/jira/browse/KAFKA-14047
 Project: Kafka
  Issue Type: Test
Reporter: dengziming


We are using lower-level {{ControllerServer}} and {{BrokerServer}} in TestKit, 
we can improve it to use KafkaRaftManager.

see the discussion here: 
https://github.com/apache/kafka/pull/12157#discussion_r882179407



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dengziming commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating

2022-07-06 Thread GitBox


dengziming commented on code in PR #12265:
URL: https://github.com/apache/kafka/pull/12265#discussion_r914535136


##
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##
@@ -240,6 +239,40 @@ class BrokerMetadataListenerTest {
 }
   }
 
+  @Test
+  def testNotSnapshotAfterMetadataVersionChangeBeforePublishing(): Unit = {
+val snapshotter = new MockMetadataSnapshotter()
+val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+  maxBytesBetweenSnapshots = 1000L)
+
+updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
MetadataVersion.latest.featureLevel(), 100L)
+listener.getImageRecords().get()
+assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate 
snapshot before starting publishing")
+  }
+
+  @Test
+  def testSnapshotAfterMetadataVersionChangeWhenStarting(): Unit = {
+val snapshotter = new MockMetadataSnapshotter()
+val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+  maxBytesBetweenSnapshots = 1000L)
+
+updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
MetadataVersion.latest.featureLevel(), 100L)
+listener.startPublishing(new MockMetadataPublisher()).get()
+assertEquals(100L, snapshotter.activeSnapshotOffset, "We should try to 
generate snapshot when starting publishing")
+  }
+
+  @Test
+  def testSnapshotAfterMetadataVersionChange(): Unit = {
+val snapshotter = new MockMetadataSnapshotter()
+val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+  maxBytesBetweenSnapshots = 1000L)
+listener.startPublishing(new MockMetadataPublisher()).get()
+
+updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_3_IV2.featureLevel(), 100L)

Review Comment:
   You are right, here I tried a different solution to update the feature level 
to `MetadataVersion.latest().featureLevel() - 1`, then we can be sure it's 
different from the current feature level, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating

2022-07-06 Thread GitBox


dengziming commented on code in PR #12265:
URL: https://github.com/apache/kafka/pull/12265#discussion_r914533237


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##
@@ -45,26 +45,37 @@ class BrokerMetadataSnapshotter(
*/
   private var _currentSnapshotOffset = -1L
 
+  /**
+   * The offset of the newest snapshot, or -1 if there hasn't been 
one.Accessed only under
+   * the object lock.
+   */
+  private var _latestSnapshotOffset = -1L
+
   /**
* The event queue which runs this listener.
*/
   val eventQueue = new KafkaEventQueue(time, logContext, 
threadNamePrefix.getOrElse(""))
 
   override def maybeStartSnapshot(lastContainedLogTime: Long, image: 
MetadataImage): Boolean = synchronized {
-if (_currentSnapshotOffset == -1L) {
+if (_currentSnapshotOffset != -1) {
+  warn(s"Declining to create a new snapshot at 
${image.highestOffsetAndEpoch()} because " +
+s"there is already a snapshot in progress at offset 
${_currentSnapshotOffset}")
+  false
+} else if (_latestSnapshotOffset >= image.highestOffsetAndEpoch().offset) {

Review Comment:
   Yes, the test failed when generating a snapshot twice at the same offset 
firstly due to enough bytes having accumulated and secondly due to the metadata 
version changed. I changed it to "==" to make it more accurate and added a unit 
test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12380: MINOR: Get rid of agent checks in Jenkinsfile

2022-07-06 Thread GitBox


showuon commented on PR #12380:
URL: https://github.com/apache/kafka/pull/12380#issuecomment-1175906691

   The `ARM build` is still failing. I've rebuilt it and failed, too.
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12380/2/pipeline/76
   
   I can also take a look when I have time. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-07-06 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563004#comment-17563004
 ] 

Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 7:29 AM:


[~cadonna] So I did some debugging on this ticket over the past week and I 
found out some interesting things.

To start off with I did manage to predictably replicate the test's flakiness 
and it does appear to be related to load, i.e. the test is more flaky the less 
CPU resources it has. I am using docker (i.e. running the tests within docker 
gradle image) by using the --cpu flag to limit resources.

Interestingly I have gone up to 5 cpu's and its still flaking out albeit less 
often. I attempted to increase the various timeouts that is used in t he test 
but this had no effect so I am going to dig a bit further.

Note that 5 cpu's is already considered "high" (at least for a machine that 
would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 
cores, 12 threads) and at least when running with all of the resources on the 
machine I couldn't replicate the flaky test.


was (Author: mdedetrich-aiven):
[~cadonna] So I did some debugging on this ticket over the past week and I 
found out some interesting things.

To start off with I did manage to predictably replicate the test and its 
flakiness does appear to be related to load, i.e. the test is more flaky the 
less CPU resources it has. I am using docker (i.e. running the tests within 
docker gradle image) by using the --cpu flag to limit resources.

Interestingly I have gone up to 5 cpu's and its still flaking out albeit less 
often. I attempted to increase the various timeouts that is used in t he test 
but this had no effect so I am going to dig a bit further.

Note that 5 cpu's is already considered "high" (at least for a machine that 
would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 
cores, 12 threads) and at least when running with all of the resources on the 
machine I couldn't replicate the flaky test.

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Critical
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/
> 

[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-07-06 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563004#comment-17563004
 ] 

Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 7:27 AM:


[~cadonna] So I did some debugging on this ticket over the past week and I 
found out some interesting things.

To start off with I did manage to predictably replicate the test and its 
flakiness does appear to be related to load, i.e. the test is more flaky the 
less CPU resources it has. I am using docker (i.e. running the tests within 
docker gradle image) by using the --cpu flag to limit resources.

Interestingly I have gone up to 5 cpu's and its still flaking out albeit less 
often. I attempted to increase the various timeouts that is used in t he test 
but this had no effect so I am going to dig a bit further.

Note that 5 cpu's is already considered "high" (at least for a machine that 
would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 
cores, 12 threads) and at least when running with all of the resources on the 
machine I couldn't replicate the flaky test.


was (Author: mdedetrich-aiven):
[~cadonna] So I did some debugging on this ticket over the past week and I 
found out some interesting things.

To start off with, the test is more flaky the less CPU resources it has. I am 
using docker (i.e. running the tests within docker gradle image) by using the 
--cpu flag to limit resources.

Interestingly I have gone up to 5 cpu's and its still flaking out albeit less 
often. I attempted to increase the various timeouts that is used in t he test 
but this had no effect so I am going to dig a bit further.

Note that 5 cpu's is already considered "high" (at least for a machine that 
would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 
cores, 12 threads) and at least when running with all of the resources on the 
machine I couldn't replicate the flaky test.

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Critical
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/
> 

[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-07-06 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563004#comment-17563004
 ] 

Matthew de Detrich edited comment on KAFKA-14014 at 7/6/22 7:26 AM:


[~cadonna] So I did some debugging on this ticket over the past week and I 
found out some interesting things.

To start off with, the test is more flaky the less CPU resources it has. I am 
using docker (i.e. running the tests within docker gradle image) by using the 
--cpu flag to limit resources.

Interestingly I have gone up to 5 cpu's and its still flaking out albeit less 
often. I attempted to increase the various timeouts that is used in t he test 
but this had no effect so I am going to dig a bit further.

Note that 5 cpu's is already considered "high" (at least for a machine that 
would reasonably run kafka streams). The machine I am running as 12 "cpus"'s (6 
cores, 12 threads) and at least when running with all of the resources on the 
machine I couldn't replicate the flaky test.


was (Author: mdedetrich-aiven):
[~cadonna] So I did some debugging on this ticket over the past week and I 
found out some interesting things.

To start off with, the test is more flaky the less CPU resources it has. I am 
using docker (i.e. running the tests within docker gradle image) by using the 
--cpu flag to limit resources.

Interestingly I have gone up to 5 cpu's and its still flaking out albeit less 
often. I attempted to increase the various timeouts that is used in t he test 
but this had no effect so I am going to dig a bit further.

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Critical
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-07-06 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563004#comment-17563004
 ] 

Matthew de Detrich commented on KAFKA-14014:


[~cadonna] So I did some debugging on this ticket over the past week and I 
found out some interesting things.

To start off with, the test is more flaky the less CPU resources it has. I am 
using docker (i.e. running the tests within docker gradle image) by using the 
--cpu flag to limit resources.

Interestingly I have gone up to 5 cpu's and its still flaking out albeit less 
often. I attempted to increase the various timeouts that is used in t he test 
but this had no effect so I am going to dig a bit further.

> Flaky test 
> NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
> 
>
> Key: KAFKA-14014
> URL: https://issues.apache.org/jira/browse/KAFKA-14014
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Critical
>
> {code:java}
> java.lang.AssertionError: 
> Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
>  but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833)
> {code}
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #12381: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection

2022-07-06 Thread GitBox


showuon commented on code in PR #12381:
URL: https://github.com/apache/kafka/pull/12381#discussion_r914443627


##
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##
@@ -189,6 +188,10 @@ class BrokerToControllerChannelManagerImpl(
 config.saslInterBrokerHandshakeRequestEnable,
 logContext
   )
+  channelBuilder match {
+case reconfigurable: Reconfigurable => 
config.addReconfigurable(reconfigurable)
+case _ =>

Review Comment:
   I think default case is not needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org