[GitHub] [kafka] yashmayya commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect

2023-06-26 Thread via GitHub


yashmayya commented on code in PR #13915:
URL: https://github.com/apache/kafka/pull/13915#discussion_r1243145305


##
docs/connect.html:
##
@@ -313,7 +313,13 @@ REST 
API
 DELETE /connectors/{name} - delete a connector, 
halting all tasks and deleting its configuration
 GET /connectors/{name}/topics - get the set of topics 
that a specific connector is using since the connector was created or since a 
request to reset its set of active topics was issued
 PUT /connectors/{name}/topics/reset - send a request 
to empty the set of active topics of a connector
-GET /connectors/{name}/offsets - get the current 
offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details)
+Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details):
+
+GET /connectors/{name}/offsets - get the 
current offsets for a connector
+DELETE /connectors/{name}/offsets - reset the 
offsets for a connector. The connector must exist and must be in the stopped 
state.
+PATCH /connectors/{name}/offsets - alter the 
offsets for a connector. The connector must exist and must be in the stopped 
state. The request body should be a JSON object containing a JSON array 
offsets field, similar to the response body of the GET 
/connectors/{name}/offsets REST API.

Review Comment:
   The link to the generated OpenAPI docs are at the end of this section so I 
don't think it's necessary to add the same link here as well. Also, I think an 
actual example might be more helpful than the generated OpenAPI spec where the 
finest level of granularity is the `ConnectorOffset` schema describing the 
`partition` and `offset` keys having JSON object values. I was hoping that the 
link to the KIP should be sufficient, but I do see the value of including 
actual examples directly in the docs as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect

2023-06-26 Thread via GitHub


yashmayya commented on code in PR #13915:
URL: https://github.com/apache/kafka/pull/13915#discussion_r1243142955


##
docs/connect.html:
##
@@ -313,7 +313,13 @@ REST 
API
 DELETE /connectors/{name} - delete a connector, 
halting all tasks and deleting its configuration
 GET /connectors/{name}/topics - get the set of topics 
that a specific connector is using since the connector was created or since a 
request to reset its set of active topics was issued
 PUT /connectors/{name}/topics/reset - send a request 
to empty the set of active topics of a connector
-GET /connectors/{name}/offsets - get the current 
offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details)
+Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details):
+
+GET /connectors/{name}/offsets - get the 
current offsets for a connector
+DELETE /connectors/{name}/offsets - reset the 
offsets for a connector. The connector must exist and must be in the stopped 
state.

Review Comment:
   Hm the `PUT /connectors/{name}/stop` endpoint docs are in the same 
`connect_rest` section as these docs and I don't think it is possible to link 
to individual list items inside the section?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13852: KAFKA-15086:Set a reasonable segment size upper limit for MM2 internal topics

2023-06-26 Thread via GitHub


hudeqi commented on PR #13852:
URL: https://github.com/apache/kafka/pull/13852#issuecomment-1608769851

   > @hudeqi sorry, this is a tricky issue and I'm trying to take time to think 
things through :)
   > 
   > I hate to say it, but I don't think we can make this change or anything 
like it without a KIP. This is for two reasons:
   > 
   > 1. We're effectively changing the default value for the 
`offset.storage.topic.segment.bytes` property (even if we don't implement this 
change with that exact logic), which counts as a change to public API for the 
project
   > 2. By explicitly setting a value for the offset topic's `segment.bytes` 
property, we cause any broker-side value for the [log.segment.bytes 
property](https://kafka.apache.org/documentation.html#brokerconfigs_log.segment.bytes)
 to be ignored. If the broker uses a lower value for this property than our 
default, then we may make things worse instead of better
   > 
   > I still think it's likely that decreasing the segment size for the offsets 
topic would help, but it'd be nice if we could get the kind of review that a 
KIP requires before making that kind of change.
   > 
   > As far as increasing the number of consumer threads goes, I think it's 
really a question of what the performance bottleneck is when reading to the end 
of the topic. If CPU is the issue, then sure, it'd probably help to scale up 
the number of consumers. However, if network transfer between the worker and 
the Kafka cluster is the limiting factor, then it won't have any impact. The 
nice thing about decreasing the segment size is that (as long as it leads to a 
reduction in the total size of the offsets topic), it would help in either 
case: you'd have less data to consume from Kafka, and also less data to process 
on your Connect worker.
   > 
   > This almost certainly varies depending on the environment Kafka Connect 
and Kafka are run in, but my hunch is that your fix here would be more 
effective than scaling up the number of consumers. I'd be curious to see if we 
could get benchmark numbers on that front, though.
   
   Thanks, I will issue a KIP and reopen this PR later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13965) Document broker-side socket-server-metrics

2023-06-26 Thread David Jameson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737431#comment-17737431
 ] 

David Jameson commented on KAFKA-13965:
---

This looks like a good first ticket to get familiar with the codebase. I am 
keen to pick it up. 

> Document broker-side socket-server-metrics
> --
>
> Key: KAFKA-13965
> URL: https://issues.apache.org/jira/browse/KAFKA-13965
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 3.2.0
>Reporter: James Cheng
>Priority: Major
>  Labels: newbie, newbie++
>
> There are a bunch of broker JMX metrics in the "socket-server-metrics" space 
> that are not documented on kafka.apache.org/documentation
>  
>  * {_}MBean{_}: 
> kafka.server:{{{}type=socket-server-metrics,listener=,networkProcessor={}}}
>  ** From KIP-188: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-188+-+Add+new+metrics+to+support+health+checks]
>  *  
> kafka.server:type=socket-server-metrics,name=connection-accept-rate,listener=\{listenerName}
>  ** From KIP-612: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]
> It would be helpful to get all the socket-server-metrics documented
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] github-actions[bot] commented on pull request #13375: KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode

2023-06-26 Thread via GitHub


github-actions[bot] commented on PR #13375:
URL: https://github.com/apache/kafka/pull/13375#issuecomment-1608729834

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] iblislin opened a new pull request, #13919: MINOR: doc/streams/dsl-api, fix href of "KTable-KTable Foreign-Key Joins"

2023-06-26 Thread via GitHub


iblislin opened a new pull request, #13919:
URL: https://github.com/apache/kafka/pull/13919

   The `href` shown in ToC is here:
   
https://github.com/apache/kafka/blob/c5889fceddb9a0174452ae60a57c8ff3f087a6a4/docs/streams/developer-guide/dsl-api.html#L52


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-26 Thread via GitHub


jeffkbkim commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1243002218


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, 
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager  The replica manager.
+ * @param deserializerThe deserializer to use.
+ * @param loadBufferSize  The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](
+  replicaManager: ReplicaManager,
+  deserializer: Deserializer[T],
+  loadBufferSize: Int
+) extends CoordinatorLoader[T] with Logging {
+  private val isRunning = new AtomicBoolean(true)
+  private val scheduler = new KafkaScheduler(1)
+  scheduler.startup()
+
+  /**
+   * Loads the coordinator by reading all the records from the TopicPartition
+   * and applying them to the Replayable object.
+   *
+   * @param tp  The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   */
+  override def load(
+tp: TopicPartition,
+coordinator: CoordinatorPlayback[T]
+): CompletableFuture[Void] = {
+val future = new CompletableFuture[Void]()
+val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
+  () => doLoad(tp, coordinator, future))
+if (result.isCancelled) {
+  future.completeExceptionally(new RuntimeException("Coordinator loader is 
closed."))
+}
+future
+  }
+
+  private def doLoad(
+tp: TopicPartition,
+coordinator: CoordinatorPlayback[T],
+future: CompletableFuture[Void]
+  ): Unit = {
+try {
+  replicaManager.getLog(tp) match {
+case None =>
+  future.completeExceptionally(new NotLeaderOrFollowerException(
+s"Could not load records from $tp because the log does not 
exist."))
+
+case Some(log) =>
+  def logEndOffset: Long = 
replicaManager.getLogEndOffset(tp).getOrElse(-1L)
+
+  // buffer may not be needed if records are read from memory
+  var buffer = ByteBuffer.allocate(0)
+  // loop breaks if leader changes at any time during the load, since 
logEndOffset is -1
+  var currOffset = log.logStartOffset
+  // loop breaks if no records have been read, since the end of the 
log has been reached
+  var readAtLeastOneRecord = true
+
+  while (currOffset < logEndOffset && readAtLeastOneRecord && 
isRunning.get) {
+val fetchDataInfo = log.read(
+  startOffset = currOffset,
+  maxLength = loadBufferSize,
+  isolation = FetchIsolation.LOG_END,
+  minOneMessage = true
+)
+
+readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
+
+val memoryRecords = (fetchDataInfo.records: @unchecked) match {
+  case records: MemoryRecords =>
+records
+
+  case fileRecords: FileRecords =>
+val sizeInBytes = fileRecords.sizeInBytes
+val bytesNeeded = Math.max(loadBufferSize, sizeInBytes)
+
+// minOneMessage = true in the above log.read means that the 
buffer may need to

Review Comment:
   nit: was confused on log.read. maybe use `log.read()`?



##

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner

2023-06-26 Thread via GitHub


gharris1727 commented on code in PR #13821:
URL: https://github.com/apache/kafka/pull/13821#discussion_r1242974877


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -104,6 +81,33 @@ public DelegatingClassLoader(List pluginLocations) {
 this(pluginLocations, DelegatingClassLoader.class.getClassLoader());
 }
 
+public Set sources() {

Review Comment:
   The refactor was just a little bit more involved (as I had to figure out a 
new mocking strategy for newPluginClassLoader) but I think it's better now. I 
like the static implementation, and I really like how empty the DCL class is 
now. 
   
   I'm not sure exactly where to put the function though, and just stuffed it 
into PluginUtils for now. Do you think this is more appropriate somewhere else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner

2023-06-26 Thread via GitHub


gharris1727 commented on code in PR #13821:
URL: https://github.com/apache/kafka/pull/13821#discussion_r1242962203


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.components.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public abstract class PluginScanner {
+
+private static final Logger log = 
LoggerFactory.getLogger(PluginScanner.class);
+
+public PluginScanResult discoverPlugins(Set sources) {
+long startMs = System.currentTimeMillis();
+List results = new ArrayList<>();
+for (PluginSource source : sources) {
+results.add(scanUrlsAndAddPlugins(source));
+}
+long endMs = System.currentTimeMillis();
+log.info("Scanning plugins with {} took {} ms", 
getClass().getSimpleName(), endMs - startMs);
+return new PluginScanResult(results);
+}
+
+private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) {
+PluginScanResult plugins = scanPlugins(source);
+loadJdbcDrivers(source.loader());
+return plugins;
+}
+
+protected abstract PluginScanResult scanPlugins(PluginSource source);
+
+private void loadJdbcDrivers(final ClassLoader loader) {
+// Apply here what java.sql.DriverManager does to discover and 
register classes
+// implementing the java.sql.Driver interface.
+AccessController.doPrivileged(
+(PrivilegedAction) () -> {
+ServiceLoader loadedDrivers = ServiceLoader.load(
+Driver.class,
+loader
+);
+Iterator driversIterator = loadedDrivers.iterator();
+try {
+while (driversIterator.hasNext()) {
+Driver driver = driversIterator.next();
+log.debug(
+"Registered java.sql.Driver: {} to 
java.sql.DriverManager",
+driver
+);
+}
+} catch (Throwable t) {
+log.debug(
+"Ignoring java.sql.Driver classes listed in 
resources but not"
++ " present in class loader's classpath: ",
+t
+);
+}
+return null;
+}
+);
+}
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
+return new PluginDesc(plugin, version, loader);
+}
+
+@SuppressWarnings("unchecked")
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+SortedSet> result = new TreeSet<>();
+ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
+for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+T pluginImpl;
+try {
+pluginImpl = iterator.next();
+} catch (ServiceConfigurationError t) {
+log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+continue;
+}
+Class pluginKlass = (Class) 
pluginImpl.getClass();
+if (pluginKlass.getClassLoader() != loader) {
+log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated 

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner

2023-06-26 Thread via GitHub


gharris1727 commented on code in PR #13821:
URL: https://github.com/apache/kafka/pull/13821#discussion_r1242961259


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -147,229 +151,7 @@ PluginClassLoader newPluginClassLoader(
 );
 }
 
-public PluginScanResult initLoaders() {
-List results = new ArrayList<>();
-for (Path pluginLocation : pluginLocations) {
-try {
-results.add(registerPlugin(pluginLocation));
-} catch (InvalidPathException | MalformedURLException e) {
-log.error("Invalid path in plugin path: {}. Ignoring.", 
pluginLocation, e);
-} catch (IOException e) {
-log.error("Could not get listing for plugin path: {}. 
Ignoring.", pluginLocation, e);
-}
-}
-// Finally add parent/system loader.
-results.add(scanUrlsAndAddPlugins(
-getParent(),
-ClasspathHelper.forJavaClassPath().toArray(new URL[0])
-));
-PluginScanResult scanResult = new PluginScanResult(results);
-installDiscoveredPlugins(scanResult);
-return scanResult;
-}
-
-private PluginScanResult registerPlugin(Path pluginLocation)
-throws IOException {
-log.info("Loading plugin from: {}", pluginLocation);
-List pluginUrls = new ArrayList<>();
-for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
-pluginUrls.add(path.toUri().toURL());
-}
-URL[] urls = pluginUrls.toArray(new URL[0]);
-if (log.isDebugEnabled()) {
-log.debug("Loading plugin urls: {}", Arrays.toString(urls));
-}
-PluginClassLoader loader = newPluginClassLoader(
-pluginLocation.toUri().toURL(),
-urls,
-this
-);
-return scanUrlsAndAddPlugins(loader, urls);
-}
-
-private PluginScanResult scanUrlsAndAddPlugins(
-ClassLoader loader,
-URL[] urls
-) {
-PluginScanResult plugins = scanPluginPath(loader, urls);
-log.info("Registered loader: {}", loader);

Review Comment:
   This and other similar log messages are now in PluginScanner, and print out 
during the same phase of plugin scanning as they did before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan opened a new pull request, #13918: DO NOT MERGE -- testing

2023-06-26 Thread via GitHub


jolshan opened a new pull request, #13918:
URL: https://github.com/apache/kafka/pull/13918

   Trying to debug flaky test that only seems to fail on CI. This is a version 
of https://github.com/apache/kafka/pull/13787 where I comment out some of the 
noisy logs and include some less noisy ones.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1242874939


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)

Review Comment:
   What is the version if we don't send the request. I think this makes sense 
on api specific metrics but maybe not the ones for the manager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1242731206


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() {
 return new AddPartitionsToTxnRequest(new 
AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), 
version());
 }
 
+public boolean verifyOnlyRequest() {

Review Comment:
   I remember why I made this choice. We want all the requests to be verify 
only because the idea is that verify only requests require in memory checks 
only. Adding the partition otherwise requires a write to the log. 
   
   In the case that any partition needs to be added and not just verified, we 
have that log write which will always take the majority of the time. In this 
case, it makes sense to group with the normal add partitions requests. However, 
in the case where all transactions are verify only, we will see faster handling 
and therefore want to separate the requests to different metrics as to not 
bring down the average for metrics like request timing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1242867082


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() {
 return new AddPartitionsToTxnRequest(new 
AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), 
version());
 }
 
+public boolean verifyOnlyRequest() {
+return version() > 3 &&

Review Comment:
   Unfortunately that introduces a dependency on main in clients. I can add 
such a constant to this request file if that helps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1242860978


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, 
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager  The replica manager.
+ * @param deserializerThe deserializer to use.
+ * @param loadBufferSize  The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](

Review Comment:
   Where are we creating this loader btw? LIke BrokerServer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1242855495


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerdeTest.java:
##
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RecordSerdeTest {
+@Test
+public void testSerializeKey() {
+RecordSerde serializer = new RecordSerde();
+Record record = new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey().setGroupId("group"),
+(short) 1
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataValue().setEpoch(10),
+(short) 0
+)
+);
+
+assertArrayEquals(
+MessageUtil.toVersionPrefixedBytes(record.key().version(), 
record.key().message()),
+serializer.serializeKey(record)
+);
+}
+
+@Test
+public void testSerializeValue() {
+RecordSerde serializer = new RecordSerde();
+Record record = new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey().setGroupId("group"),
+(short) 1
+),
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataValue().setEpoch(10),
+(short) 0
+)
+);
+
+assertArrayEquals(
+MessageUtil.toVersionPrefixedBytes(record.value().version(), 
record.value().message()),
+serializer.serializeValue(record)
+);
+}
+
+@Test
+public void testSerializeNullValue() {
+RecordSerde serializer = new RecordSerde();
+Record record = new Record(
+new ApiMessageAndVersion(
+new ConsumerGroupMetadataKey().setGroupId("group"),
+(short) 1
+),
+null
+);
+
+assertNull(serializer.serializeValue(record));
+}
+
+@Test
+public void testDeserialize() {
+RecordSerde serDe = new RecordSerde();
+
+ApiMessageAndVersion key = new ApiMessageAndVersion(
+new 

[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1242849519


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -26,7 +27,40 @@
  *
  * @param  The type of the record.
  */
-public interface CoordinatorLoader {
+public interface CoordinatorLoader extends AutoCloseable {
+
+/**
+ * UnknownRecordTypeException is thrown when the Deserializer encounters
+ * an unknown record type.
+ */
+class UnknownRecordTypeException extends RuntimeException {
+private final short unknownType;
+
+public UnknownRecordTypeException(short unknownType) {
+super(String.format("Found an unknown record type %d", 
unknownType));
+this.unknownType = unknownType;
+}
+
+public short unknownType() {
+return unknownType;

Review Comment:
   Ah I see in the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1242822664


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -26,7 +27,40 @@
  *
  * @param  The type of the record.
  */
-public interface CoordinatorLoader {
+public interface CoordinatorLoader extends AutoCloseable {
+
+/**
+ * UnknownRecordTypeException is thrown when the Deserializer encounters
+ * an unknown record type.
+ */
+class UnknownRecordTypeException extends RuntimeException {
+private final short unknownType;
+
+public UnknownRecordTypeException(short unknownType) {
+super(String.format("Found an unknown record type %d", 
unknownType));
+this.unknownType = unknownType;
+}
+
+public short unknownType() {
+return unknownType;

Review Comment:
   Do we use this anywhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1242819355


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorLoader.java:
##
@@ -26,7 +27,40 @@
  *
  * @param  The type of the record.
  */
-public interface CoordinatorLoader {
+public interface CoordinatorLoader extends AutoCloseable {
+
+/**
+ * UnknownRecordTypeException is thrown when the Deserializer encounters
+ * an unknown record type.
+ */
+class UnknownRecordTypeException extends RuntimeException {

Review Comment:
   Did we previously not have such an exception? I'm suprised  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1242803548


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {
+@Override
+public byte[] serializeKey(Record record) {
+// Record does not accept a null key.
+return MessageUtil.toVersionPrefixedBytes(
+record.key().version(),
+record.key().message()
+);
+}
+
+@Override
+public byte[] serializeValue(Record record) {
+// Tombstone is represented with a null value.
+if (record.value() == null) {
+return null;
+} else {
+return MessageUtil.toVersionPrefixedBytes(
+record.value().version(),
+record.value().message()
+);
+}
+}
+
+@Override
+public Record deserialize(
+ByteBuffer keyBuffer,
+ByteBuffer valueBuffer
+) throws RuntimeException {
+final short recordType = readVersion(keyBuffer, "key");
+final ApiMessage keyMessage = apiMessageKeyFor(recordType);
+readMessage(keyMessage, keyBuffer, recordType, "key");
+
+if (valueBuffer == null) {
+return new Record(new ApiMessageAndVersion(keyMessage, 
recordType), null);
+}
+
+final ApiMessage valueMessage = apiMessageValueFor(recordType);
+final short valueVersion = readVersion(valueBuffer, "value");

Review Comment:
   This is also just to distinguish the various records right? Any reason why 
the key is recordType, but the value is valueVersion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1242789616


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerde.java:
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.MessageUtil;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
+import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
+import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer for {{@link Record}}.
+ */
+public class RecordSerde implements PartitionWriter.Serializer, 
CoordinatorLoader.Deserializer {

Review Comment:
   Did we want to include a reference to the group coordinator in the class 
name? Or is it enough to be in this classpath?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1242786634


##
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala:
##
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.log.UnifiedLog
+import kafka.server.ReplicaManager
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{CompressionType, FileRecords, 
MemoryRecords, SimpleRecord}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.UnknownRecordTypeException
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchIsolation, 
LogOffsetMetadata}
+import org.apache.kafka.test.TestUtils.assertFutureThrows
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNull}
+import org.junit.jupiter.api.{Test, Timeout}
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.{mock, verify, when}
+
+import java.nio.ByteBuffer
+import java.nio.charset.Charset
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+class StringKeyValueDeserializer extends 
CoordinatorLoader.Deserializer[(String, String)] {
+  override def deserialize(key: ByteBuffer, value: ByteBuffer): (String, 
String) = {
+(
+  Charset.defaultCharset().decode(key).toString,
+  Charset.defaultCharset().decode(value).toString
+)
+  }
+}
+
+@Timeout(60)
+class CoordinatorLoaderImplTest {
+  @Test
+  def testNonexistentPartition(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBufferSize = 1000
+)) { loader =>
+  when(replicaManager.getLog(tp)).thenReturn(None)
+
+  val result = loader.load(tp, coordinator)
+  assertFutureThrows(result, classOf[NotLeaderOrFollowerException])
+}
+  }
+
+  @Test
+  def testLoadingIsRejectedWhenClosed(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = mock(classOf[CoordinatorLoader.Deserializer[(String, String)]])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBufferSize = 1000
+)) { loader =>
+  loader.close()
+
+  val result = loader.load(tp, coordinator)
+  assertFutureThrows(result, classOf[RuntimeException])
+}
+  }
+
+  @Test
+  def testLoading(): Unit = {
+val tp = new TopicPartition("foo", 0)
+val replicaManager = mock(classOf[ReplicaManager])
+val serde = new StringKeyValueDeserializer
+val log = mock(classOf[UnifiedLog])
+val coordinator = mock(classOf[CoordinatorPlayback[(String, String)]])
+
+TestUtils.resource(new CoordinatorLoaderImpl[(String, String)](
+  replicaManager = replicaManager,
+  deserializer = serde,
+  loadBufferSize = 1000
+)) { loader =>
+  when(replicaManager.getLog(tp)).thenReturn(Some(log))
+  when(log.logStartOffset).thenReturn(0L)
+  when(replicaManager.getLogEndOffset(tp)).thenReturn(Some(5L))
+
+  val readResult1 = logReadResult(startOffset = 0, records = Seq(
+new SimpleRecord("k1".getBytes, "v1".getBytes),
+new SimpleRecord("k2".getBytes, "v2".getBytes)
+  ))
+
+  when(log.read(
+startOffset = 0L,
+maxLength = 1000,
+isolation = FetchIsolation.LOG_END,
+minOneMessage = true
+  )).thenReturn(readResult1)
+
+  val readResult2 = logReadResult(startOffset = 2, records = Seq(
+new SimpleRecord("k3".getBytes, "v3".getBytes),
+new 

[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1242731206


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() {
 return new AddPartitionsToTxnRequest(new 
AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), 
version());
 }
 
+public boolean verifyOnlyRequest() {

Review Comment:
   I remember why I made this choice. We want all the requests to be verify 
only because the idea is that verify only requests require in memory checks 
only. Adding the partition otherwise requires a write to the log. 
   
   In the case that any partition needs to be added and not just verified, we 
have that log write which will always take the majority of the time. In this 
case, it makes sense to group with the normal add partitions requests. However, 
in the case where all partitions are verify only, we will see faster handling 
and therefore want to separate the requests to different metrics as to not 
bring down the average for metrics like request timing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1242724200


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)
+  val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs")

Review Comment:
   Sorry if this is a bit confusing. But the plan was not to have the metrics 
on the request type and rather on the manager object. This is why we report the 
metrics even when we don't send the verification (AddPartitionsToTxn) request.
   
   We can use the AddPartitionsToTxn Verify requests to get the timing and 
error rates from the api. However, these metrics are intended to capture the 
full end to end handling of the verification. This means the timing will also 
include any queuing/waiting for sending to the coordinator and we can get 
errors rates when we don't send the request (ie, repeat produce request with 
the same producer id). They are meant to supplement the api metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1242721231


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() {
 return new AddPartitionsToTxnRequest(new 
AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), 
version());
 }
 
+public boolean verifyOnlyRequest() {

Review Comment:
   Let me think on this a bit -- maybe we don't need all the fields to be 
verify only. If I change anything, I will also add the comment explaining.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1242719150


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -2158,6 +2158,12 @@ object TestUtils extends Logging {
   KafkaYammerMetrics.defaultRegistry.removeMetric(metricName)
   }
 
+  def clearYammerMetric(metricName: String): Unit = {

Review Comment:
   I meant to remove this. Thanks for reminding me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect

2023-06-26 Thread via GitHub


C0urante commented on code in PR #13915:
URL: https://github.com/apache/kafka/pull/13915#discussion_r1242245895


##
docs/connect.html:
##
@@ -301,7 +301,7 @@ REST 
API
 GET /connectors/{name}/tasks - get a list of tasks 
currently running for a connector
 GET /connectors/{name}/tasks/{taskid}/status - get 
current status of the task, including if it is running, failed, paused, etc., 
which worker it is assigned to, and error information if it has failed
 PUT /connectors/{name}/pause - pause the connector 
and its tasks, which stops message processing until the connector is resumed. 
Any resources claimed by its tasks are left allocated, which allows the 
connector to begin processing data quickly once it is resumed.
-PUT /connectors/{name}/stop - stop the connector and 
shut down its tasks, deallocating any resources claimed by its tasks. This is 
more efficient from a resource usage standpoint than pausing the connector, but 
can cause it to take longer to begin processing data once resumed.
+PUT /connectors/{name}/stop - stop the connector and 
shut down its tasks, deallocating any resources claimed by its tasks. This is 
more efficient from a resource usage standpoint than pausing the connector, but 
can cause it to take longer to begin processing data once resumed. Note that 
the offsets for a connector can be modified via the offsets management REST 
APIs only if it is in the stopped state.

Review Comment:
   ```suggestion
   PUT /connectors/{name}/stop - stop the connector 
and shut down its tasks, deallocating any resources claimed by its tasks. This 
is more efficient from a resource usage standpoint than pausing the connector, 
but can cause it to take longer to begin processing data once resumed. Note 
that the offsets for a connector can be only modified via the offsets 
management endpoints if it is in the stopped state
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13884: MINOR: fix typos for client

2023-06-26 Thread via GitHub


divijvaidya commented on code in PR #13884:
URL: https://github.com/apache/kafka/pull/13884#discussion_r1242669461


##
clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java:
##
@@ -854,7 +854,7 @@ public void shouldThrowOnInvalidDateFormatOrNullTimestamp() 
{
 
 private void checkExceptionForGetDateTimeMethod(Executable executable) {
 assertTrue(assertThrows(ParseException.class, executable)
-.getMessage().contains("Unparseable date"));
+.getMessage().contains("Unparsable date"));

Review Comment:
   we have some test failures associated with this: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13884/3/testReport/junit/org.apache.kafka.common.utils/UtilsTest/Build___JDK_11_and_Scala_2_13___shouldThrowOnInvalidDateFormatOrNullTimestamp__/
 
   
   Please fix the source code along with the test validation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13831: KAFKA-15053: Use case insensitive validator for security.protocol config

2023-06-26 Thread via GitHub


divijvaidya commented on PR #13831:
URL: https://github.com/apache/kafka/pull/13831#issuecomment-1608088036

   @C0urante ah! we have fancy stuff. Thanks for letting me know. 
   
   @bogao007 seems like we might not need the docs change for config after all. 
The only remaining fix is the small nit. We should be ready to merge (assuming 
sane CI tests) after that (unless @C0urante has some additional comments?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-8977) Remove MockStreamsMetrics Since it is not a Mock

2023-06-26 Thread Joobi S B (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joobi S B reassigned KAFKA-8977:


Assignee: Joobi S B

> Remove MockStreamsMetrics Since it is not a Mock
> 
>
> Key: KAFKA-8977
> URL: https://issues.apache.org/jira/browse/KAFKA-8977
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Joobi S B
>Priority: Minor
>  Labels: newbie
>
> The class {{MockStreamsMetrics}} is used throughout unit tests as a mock but 
> it is not really a mock since it only hides two parameters of the 
> {{StreamsMetricsImpl}} constructor. Either a real mock or the real 
> {{StreamsMetricsImpl}} should be used in the tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15050) Prompts in the quickstarts

2023-06-26 Thread Joobi S B (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737315#comment-17737315
 ] 

Joobi S B commented on KAFKA-15050:
---

Hi [~tombentley], i've updated the PR with latest comments,  could you please 
have a look [https://github.com/apache/kafka/pull/13862 
|https://github.com/apache/kafka/pull/13862]

> Prompts in the quickstarts
> --
>
> Key: KAFKA-15050
> URL: https://issues.apache.org/jira/browse/KAFKA-15050
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Tom Bentley
>Assignee: Joobi S B
>Priority: Trivial
>  Labels: newbie
>
> In the quickstarts [Steps 
> 1-5|https://kafka.apache.org/documentation/#quickstart] use {{$}} to indicate 
> the command prompt. When we start to use Kafka Connect in [Step 
> 6|https://kafka.apache.org/documentation/#quickstart_kafkaconnect] we switch 
> to {{{}>{}}}. The [Kafka Streams 
> quickstart|https://kafka.apache.org/documentation/streams/quickstart] also 
> uses {{{}>{}}}. I don't think there's a reason for this, but if there is one 
> (root vs user account?) it should be explained.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-26 Thread via GitHub


divijvaidya commented on code in PR #13798:
URL: https://github.com/apache/kafka/pull/13798#discussion_r1242585330


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() {
 return new AddPartitionsToTxnRequest(new 
AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), 
version());
 }
 
+public boolean verifyOnlyRequest() {
+return version() > 3 &&
+data.transactions().stream().filter(transaction -> 
transaction.verifyOnly()).toArray().length == data.transactions().size();

Review Comment:
   could be replaced with 
`data.transactions().stream().allMatch(AddPartitionsToTxnTransaction::verifyOnly)`
 and this would let the JDK library perform this check optimally. 



##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -240,16 +240,17 @@ object RequestChannel extends Logging {
   val responseSendTimeMs = nanosToMs(endTimeNanos - 
responseDequeueTimeNanos)
   val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
   val totalTimeMs = nanosToMs(endTimeNanos - startTimeNanos)
-  val fetchMetricNames =
+  val metricNames =

Review Comment:
   s/metricNames/overrideMetricNames?



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)

Review Comment:
   we are using `VerificationFailureRate` at couple of places such as shutdown 
and in tests. Please move to a constant in companion object. 



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)

Review Comment:
   you might be interested in adding "version" to the tags



##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -2158,6 +2158,12 @@ object TestUtils extends Logging {
   KafkaYammerMetrics.defaultRegistry.removeMetric(metricName)
   }
 
+  def clearYammerMetric(metricName: String): Unit = {

Review Comment:
   where are we using this?



##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() {
 return new AddPartitionsToTxnRequest(new 
AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), 
version());
 }
 
+public boolean verifyOnlyRequest() {
+return version() > 3 &&

Review Comment:
   (optional)
   May I suggest adding a constant, `EARLIEST_SUPPORTED_VERSION` to 
`AddPartitionsToTxnManager` and using the constant over here. It greatly helps 
in code readability. 



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -47,13 +55,19 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   private val inflightNodes = mutable.HashSet[Node]()
   private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
 
+  private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  val verificationFailureRate = 
metricsGroup.newMeter("VerificationFailureRate", "failures", TimeUnit.SECONDS)
+  val verificationTimeMs = metricsGroup.newHistogram("VerificationTimeMs")

Review Comment:
   We probably want this (and failure) metric consistent with metrics that we 
have for other requests. To make it consistent, we will have to do things like, 
1\ use a biased histogram 2\ add tags such as request=AddPartitionsToTxn.
   
   Please reference RequestChannel#RequestMetrics on how other APIs are 
capturing metrics.



##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -154,6 +154,11 @@ public AddPartitionsToTxnRequest normalizeRequest() {
 return new AddPartitionsToTxnRequest(new 
AddPartitionsToTxnRequestData().setTransactions(singletonTransaction()), 
version());
 }
 
+public boolean verifyOnlyRequest() {

Review Comment:
   It would be nice if we could add a java doc saying that we expect all 
requests from clients which are version > 3 to contain verifyOnly 

[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1242623544


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, 
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager  The replica manager.
+ * @param deserializerThe deserializer to use.
+ * @param loadBufferSize  The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](
+  replicaManager: ReplicaManager,
+  deserializer: Deserializer[T],
+  loadBufferSize: Int
+) extends CoordinatorLoader[T] with Logging {
+  private val isRunning = new AtomicBoolean(true)
+  private val scheduler = new KafkaScheduler(1)
+  scheduler.startup()
+
+  /**
+   * Loads the coordinator by reading all the records from the TopicPartition
+   * and applying them to the Replayable object.
+   *
+   * @param tp  The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   */
+  override def load(
+tp: TopicPartition,
+coordinator: CoordinatorPlayback[T]
+): CompletableFuture[Void] = {
+val future = new CompletableFuture[Void]()
+val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
+  () => doLoad(tp, coordinator, future))
+if (result.isCancelled) {
+  future.completeExceptionally(new RuntimeException("Coordinator loader is 
closed."))
+}
+future
+  }
+
+  private def doLoad(
+tp: TopicPartition,
+coordinator: CoordinatorPlayback[T],
+future: CompletableFuture[Void]
+  ): Unit = {
+try {
+  replicaManager.getLog(tp) match {
+case None =>
+  future.completeExceptionally(new NotLeaderOrFollowerException(
+s"Could not load records from $tp because the log does not 
exist."))
+
+case Some(log) =>
+  def logEndOffset: Long = 
replicaManager.getLogEndOffset(tp).getOrElse(-1L)
+
+  // buffer may not be needed if records are read from memory
+  var buffer = ByteBuffer.allocate(0)
+  // loop breaks if leader changes at any time during the load, since 
logEndOffset is -1
+  var currOffset = log.logStartOffset
+  // loop breaks if no records have been read, since the end of the 
log has been reached
+  var readAtLeastOneRecord = true
+
+  while (currOffset < logEndOffset && readAtLeastOneRecord && 
isRunning.get) {
+val fetchDataInfo = log.read(
+  startOffset = currOffset,
+  maxLength = loadBufferSize,
+  isolation = FetchIsolation.LOG_END,
+  minOneMessage = true
+)
+
+readAtLeastOneRecord = fetchDataInfo.records.sizeInBytes > 0
+
+val memoryRecords = (fetchDataInfo.records: @unchecked) match {
+  case records: MemoryRecords =>
+records
+
+  case fileRecords: FileRecords =>
+val sizeInBytes = fileRecords.sizeInBytes
+val bytesNeeded = Math.max(loadBufferSize, sizeInBytes)
+
+// minOneMessage = true in the above log.read means that the 
buffer may need to
+// be grown to ensure progress can be made.
+if (buffer.capacity < bytesNeeded) {
+  if (loadBufferSize < 

[GitHub] [kafka] jolshan commented on a diff in pull request #13880: KAFKA-14462; [19/N] Add CoordinatorLoader implementation

2023-06-26 Thread via GitHub


jolshan commented on code in PR #13880:
URL: https://github.com/apache/kafka/pull/13880#discussion_r1242616962


##
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.coordinator.group
+
+import kafka.server.ReplicaManager
+import kafka.utils.Logging
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorLoader.{Deserializer, 
UnknownRecordTypeException}
+import org.apache.kafka.coordinator.group.runtime.{CoordinatorLoader, 
CoordinatorPlayback}
+import org.apache.kafka.server.util.KafkaScheduler
+import org.apache.kafka.storage.internals.log.FetchIsolation
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.jdk.CollectionConverters._
+
+/**
+ * Coordinator loader which reads records from a partition and replays them
+ * to a group coordinator.
+ *
+ * @param replicaManager  The replica manager.
+ * @param deserializerThe deserializer to use.
+ * @param loadBufferSize  The load buffer size.
+ * @tparam T The record type.
+ */
+class CoordinatorLoaderImpl[T](
+  replicaManager: ReplicaManager,
+  deserializer: Deserializer[T],
+  loadBufferSize: Int
+) extends CoordinatorLoader[T] with Logging {
+  private val isRunning = new AtomicBoolean(true)
+  private val scheduler = new KafkaScheduler(1)
+  scheduler.startup()
+
+  /**
+   * Loads the coordinator by reading all the records from the TopicPartition
+   * and applying them to the Replayable object.
+   *
+   * @param tp  The TopicPartition to read from.
+   * @param coordinator The object to apply records to.
+   */
+  override def load(
+tp: TopicPartition,
+coordinator: CoordinatorPlayback[T]
+): CompletableFuture[Void] = {
+val future = new CompletableFuture[Void]()
+val result = scheduler.scheduleOnce(s"Load coordinator from $tp",
+  () => doLoad(tp, coordinator, future))
+if (result.isCancelled) {

Review Comment:
   This works because we schedule a no-op task when the scheduler is not 
running?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13383: KAFKA-14059 Replace PowerMock with Mockito in WorkerSourceTaskTest

2023-06-26 Thread via GitHub


C0urante commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1242606914


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##
@@ -706,95 +662,72 @@ public void testSourceTaskIgnoresProducerException() 
throws Exception {
 // and no ConnectException will be thrown
 SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record2 = new SourceRecord(PARTITION, offset2, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
-expectOffsetFlush(true);
-expectSendRecordOnce();
-expectSendRecordProducerCallbackFail();
-sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), 
EasyMock.isNull());
 
-//As of KAFKA-14079 all offsets should be committed, even for failed 
records (if ignored)
-//Only the last offset will be passed to the method as everything up 
to that point is committed
-//Before KAFKA-14079 offset 12 would have been passed and not 13 as it 
would have been unacked
-offsetWriter.offset(PARTITION, offset2);
-PowerMock.expectLastCall();
+expectOffsetFlush();
+expectPreliminaryCalls();
 
-PowerMock.replayAll();
+when(producer.send(any(ProducerRecord.class), any(Callback.class)))
+.thenAnswer(producerSendAnswer(true))
+.thenAnswer(producerSendAnswer(false));
 
 //Send records and then commit offsets and verify both were committed 
and no exception
-Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
-Whitebox.invokeMethod(workerTask, "sendRecords");
-Whitebox.invokeMethod(workerTask, "updateCommittableOffsets");
+workerTask.toSend = Arrays.asList(record1, record2);
+workerTask.sendRecords();
+workerTask.updateCommittableOffsets();
 workerTask.commitOffsets();
 
-PowerMock.verifyAll();
+//As of KAFKA-14079 all offsets should be committed, even for failed 
records (if ignored)
+//Only the last offset will be passed to the method as everything up 
to that point is committed
+//Before KAFKA-14079 offset 12 would have been passed and not 13 as it 
would have been unacked
+verify(offsetWriter).offset(PARTITION, offset2);
+verify(sourceTask).commitRecord(any(SourceRecord.class), isNull());
 
 //Double check to make sure all submitted records were cleared
-assertEquals(0, ((SubmittedRecords) 
Whitebox.getInternalState(workerTask,
-"submittedRecords")).records.size());
+assertEquals(0, workerTask.submittedRecords.records.size());
 }
 
 @Test
 public void testSlowTaskStart() throws Exception {
 final CountDownLatch startupLatch = new CountDownLatch(1);
 final CountDownLatch finishStartupLatch = new CountDownLatch(1);
-
 createWorkerTask();
 
-offsetStore.start();
-EasyMock.expectLastCall();
-sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
-EasyMock.expectLastCall();
-sourceTask.start(TASK_PROPS);
-EasyMock.expectLastCall().andAnswer(() -> {
+doAnswer((Answer) invocation -> {
 startupLatch.countDown();
-assertTrue(awaitLatch(finishStartupLatch));
+ConcurrencyUtils.awaitLatch(finishStartupLatch, "Timeout waiting 
for task to stop");

Review Comment:
   The purpose of `finishStartupLatch` is to ensure that we invoke 
`WorkerTask::stop` while the `WorkerTask` (on its separate thread) is in the 
middle of invoking `SourceTask::start`.
   
   We should add that logic back. I was only commenting on the error message; 
the rest of the test case looked correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15126) Change range queries to accept null lower and upper bounds

2023-06-26 Thread Lucia Cerchie (Jira)
Lucia Cerchie created KAFKA-15126:
-

 Summary: Change range queries to accept null lower and upper bounds
 Key: KAFKA-15126
 URL: https://issues.apache.org/jira/browse/KAFKA-15126
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Lucia Cerchie
Assignee: Lucia Cerchie


{color:#1d1c1d}When web client requests come in with query params, it's common 
for those params to be null. We want developers to just be able to pass in the 
upper/lower bounds if they want instead of implementing their own logic to 
avoid getting the whole range (which will happen if they leave the params 
null). {color}

{color:#1d1c1d}An example of the logic they can avoid using after this KIP is 
implemented is below:{color}
{code:java}
private RangeQuery> 
createRangeQuery(String lower, String upper) {
if (isBlank(lower) && isBlank(upper)) {
return RangeQuery.withNoBounds();
} else if (!isBlank(lower) && isBlank(upper)) {
return RangeQuery.withLowerBound(lower);
} else if (isBlank(lower) && !isBlank(upper)) {
return RangeQuery.withUpperBound(upper);
} else {
return RangeQuery.withRange(lower, upper);
}
} {code}
 
| |



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe closed pull request #7646: KAFKA-7504: prepare FetchResponses in the request handler threads

2023-06-26 Thread via GitHub


cmccabe closed pull request #7646: KAFKA-7504: prepare FetchResponses in the 
request handler threads 
URL: https://github.com/apache/kafka/pull/7646


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #4442: Add TestKit

2023-06-26 Thread via GitHub


cmccabe closed pull request #4442: Add TestKit
URL: https://github.com/apache/kafka/pull/4442


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #4442: Add TestKit

2023-06-26 Thread via GitHub


cmccabe commented on PR #4442:
URL: https://github.com/apache/kafka/pull/4442#issuecomment-1607963901

   This has been implemented, although in a slightly different form. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #3794: KAFKA-5838. Speed up running system tests in docker a bit with better…

2023-06-26 Thread via GitHub


cmccabe closed pull request #3794: KAFKA-5838. Speed up running system tests in 
docker a bit with better…
URL: https://github.com/apache/kafka/pull/3794


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jaewie commented on pull request #13891: Breaks down SocketServer classes into ConnectionQuotaEntity.scala and ConnectionQuotas.scala

2023-06-26 Thread via GitHub


jaewie commented on PR #13891:
URL: https://github.com/apache/kafka/pull/13891#issuecomment-1607955171

   Hi @kirktrue, I  just created one here 
https://issues.apache.org/jira/browse/KAFKA-15125


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15125) Break down SocketServer classes into separate files

2023-06-26 Thread Jae Wie (Jira)
Jae Wie created KAFKA-15125:
---

 Summary: Break down SocketServer classes into separate files
 Key: KAFKA-15125
 URL: https://issues.apache.org/jira/browse/KAFKA-15125
 Project: Kafka
  Issue Type: Task
  Components: network
Reporter: Jae Wie


Since SocketServer is almost 2000 lines of code, it's easier to move its 
classes into separate files  in order to make them more manageable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe closed pull request #3359: KAFKA-5062. Kafka brokers can accept malformed requests which allocat…

2023-06-26 Thread via GitHub


cmccabe closed pull request #3359: KAFKA-5062. Kafka brokers can accept 
malformed requests which allocat…
URL: https://github.com/apache/kafka/pull/3359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13852: KAFKA-15086:Set a reasonable segment size upper limit for MM2 internal topics

2023-06-26 Thread via GitHub


C0urante commented on PR #13852:
URL: https://github.com/apache/kafka/pull/13852#issuecomment-1607920082

   @hudeqi sorry, this is a tricky issue and I'm trying to take time to think 
things through :)
   
   I hate to say it, but I don't think we can make this change or anything like 
it without a KIP. This is for two reasons:
   
   1. We're effectively changing the default value for the 
`offset.storage.topic.segment.bytes` property (even if we don't implement this 
change with that exact logic), which counts as a change to public API for the 
project
   2. By explicitly setting a value for the offset topic's `segment.bytes` 
property, we cause any broker-side value for the [log.segment.bytes 
property](https://kafka.apache.org/documentation.html#brokerconfigs_log.segment.bytes)
 to be ignored. If the broker uses a lower value for this property than our 
default, then we may make things worse instead of better
   
   I still think it's likely that decreasing the segment size for the offsets 
topic would help, but it'd be nice if we could get the kind of review that a 
KIP requires before making that kind of change.
   
   As far as increasing the number of consumer threads goes, I think it's 
really a question of what the performance bottleneck is when reading to the end 
of the topic. If CPU is the issue, then sure, it'd probably help to scale up 
the number of consumers. However, if network transfer between the worker and 
the Kafka cluster is the limiting factor, then it won't have any impact. The 
nice thing about decreasing the segment size is that (as long as it leads to a 
reduction in the total size of the offsets topic), it would help in either 
case: you'd have less data to consume from Kafka, and also less data to process 
on your Connect worker.
   
   This almost certainly varies depending on the environment Kafka Connect and 
Kafka are run in, but my hunch is that your fix here would be more effective 
than scaling up the number of consumers. I'd be curious to see if we could get 
benchmark numbers on that front, though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13845: KAFKA-15078; KRaft leader replys with snapshot for offset 0

2023-06-26 Thread via GitHub


jsancio commented on code in PR #13845:
URL: https://github.com/apache/kafka/pull/13845#discussion_r1235409277


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -1017,7 +1017,16 @@ private FetchResponseData tryCompleteFetchRequest(
 long fetchOffset = request.fetchOffset();
 int lastFetchedEpoch = request.lastFetchedEpoch();
 LeaderState state = quorum.leaderStateOrThrow();
-ValidOffsetAndEpoch validOffsetAndEpoch = 
log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+Optional latestSnapshotId = log.latestSnapshotId();
+final ValidOffsetAndEpoch validOffsetAndEpoch;
+if (fetchOffset == 0 && latestSnapshotId.isPresent()) {

Review Comment:
   Thanks @dengziming 
   
   We had a similar conversation in another PR: 
https://github.com/apache/kafka/pull/13834#discussion_r1224779841
   
   In short, it is not clear to me that these improvements (implementation 
complexities) are a big win for the cluster metadata partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13721: KAFKA-14782: Implementation Details Different from Documentation (del…

2023-06-26 Thread via GitHub


jolshan commented on PR #13721:
URL: https://github.com/apache/kafka/pull/13721#issuecomment-1607896742

   I think the safer thing would be to just update the text to not include this 
in the calculation and any documentation. 
   I don't think this has caused anyone issues in the current state without 
this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13876: KAFKA-10733: Clean up producer exceptions

2023-06-26 Thread via GitHub


jolshan commented on PR #13876:
URL: https://github.com/apache/kafka/pull/13876#issuecomment-1607891946

   > - Keep things somewhat inconsistent
   > - Do not wrap fatal exceptions and keep the exceptions as-is
   > - Do wrap fatal exceptions and let the users deal with it
   > - Always rethrow with the correct exception type, so we do wrap the 
exception (giving us the right stack traces), but we don't use KafkaException 
but call a new method rethrow on KafkaException that rethrows the exception. 
However, that would certainly require a new KIP.
   
   These options I'm assuming are just for the ProducerFencedException and 
InvalidProducerEpochException? Or is this referring to the entire PR? 
   
   I think maybe those two exceptions are the biggest concerns for the reasons 
you mention above. If it is the case, is the main reason for wrapping 
consistency? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13798: KAFKA-15028: AddPartitionsToTxnManager metrics

2023-06-26 Thread via GitHub


jolshan commented on PR #13798:
URL: https://github.com/apache/kafka/pull/13798#issuecomment-1607881974

   The last builds are looking better. @divijvaidya please take a look when you 
get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clayburn commented on pull request #13676: MINOR: Capture build scans on ge.apache.org to benefit from deep build insights

2023-06-26 Thread via GitHub


clayburn commented on PR #13676:
URL: https://github.com/apache/kafka/pull/13676#issuecomment-1607863528

   @ijuma Under our Software License and Sponsorship Agreement, Apache's users 
and contributors may use the Gradle Enterprise instance and run any ASF builds 
with GE enabled solely in connection with the development of any ASF open 
source project. This applies even if the users are employed by a third party or 
making such contributions to an ASF open source project as part of their work 
as directed by a third party employer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15123) Add tests for ChunkedBytesStream

2023-06-26 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya reassigned KAFKA-15123:


Assignee: Max Riedel

> Add tests for ChunkedBytesStream
> 
>
> Key: KAFKA-15123
> URL: https://issues.apache.org/jira/browse/KAFKA-15123
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Assignee: Max Riedel
>Priority: Minor
>  Labels: newbie
>
> We need to add cases against the public interfaces of this class to test for 
> scenarios for Int overflow etc. for input parameters
> Refer: [https://github.com/apache/kafka/pull/12948#discussion_r1242345211] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #13821: KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner and ServiceLoaderScanner

2023-06-26 Thread via GitHub


C0urante commented on code in PR #13821:
URL: https://github.com/apache/kafka/pull/13821#discussion_r1242437655


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.connect.components.Versioned;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public abstract class PluginScanner {
+
+private static final Logger log = 
LoggerFactory.getLogger(PluginScanner.class);
+
+public PluginScanResult discoverPlugins(Set sources) {
+long startMs = System.currentTimeMillis();
+List results = new ArrayList<>();
+for (PluginSource source : sources) {
+results.add(scanUrlsAndAddPlugins(source));
+}
+long endMs = System.currentTimeMillis();
+log.info("Scanning plugins with {} took {} ms", 
getClass().getSimpleName(), endMs - startMs);
+return new PluginScanResult(results);
+}
+
+private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) {
+PluginScanResult plugins = scanPlugins(source);
+loadJdbcDrivers(source.loader());
+return plugins;
+}
+
+protected abstract PluginScanResult scanPlugins(PluginSource source);
+
+private void loadJdbcDrivers(final ClassLoader loader) {
+// Apply here what java.sql.DriverManager does to discover and 
register classes
+// implementing the java.sql.Driver interface.
+AccessController.doPrivileged(
+(PrivilegedAction) () -> {
+ServiceLoader loadedDrivers = ServiceLoader.load(
+Driver.class,
+loader
+);
+Iterator driversIterator = loadedDrivers.iterator();
+try {
+while (driversIterator.hasNext()) {
+Driver driver = driversIterator.next();
+log.debug(
+"Registered java.sql.Driver: {} to 
java.sql.DriverManager",
+driver
+);
+}
+} catch (Throwable t) {
+log.debug(
+"Ignoring java.sql.Driver classes listed in 
resources but not"
++ " present in class loader's classpath: ",
+t
+);
+}
+return null;
+}
+);
+}
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
+return new PluginDesc(plugin, version, loader);
+}
+
+@SuppressWarnings("unchecked")
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+SortedSet> result = new TreeSet<>();
+ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
+for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
+try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+T pluginImpl;
+try {
+pluginImpl = iterator.next();
+} catch (ServiceConfigurationError t) {
+log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+continue;
+}
+Class pluginKlass = (Class) 
pluginImpl.getClass();
+if (pluginKlass.getClassLoader() != loader) {
+log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",

[jira] [Commented] (KAFKA-15123) Add tests for ChunkedBytesStream

2023-06-26 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737267#comment-17737267
 ] 

Divij Vaidya commented on KAFKA-15123:
--

Sure thing. Please go ahead. I assigned this to you.

> Add tests for ChunkedBytesStream
> 
>
> Key: KAFKA-15123
> URL: https://issues.apache.org/jira/browse/KAFKA-15123
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Assignee: Max Riedel
>Priority: Minor
>  Labels: newbie
>
> We need to add cases against the public interfaces of this class to test for 
> scenarios for Int overflow etc. for input parameters
> Refer: [https://github.com/apache/kafka/pull/12948#discussion_r1242345211] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-9015) Unify metric names

2023-06-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-9015:
-
Labels: easy kip needs-kip newbie  (was: easy kip newbie)

> Unify metric names
> --
>
> Key: KAFKA-9015
> URL: https://issues.apache.org/jira/browse/KAFKA-9015
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 2.5.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Csenge Maruzsi
>Priority: Major
>  Labels: easy, kip, needs-kip, newbie
>
> Some of the metrics use a lower-case style metrics name like "i-am-a-metric" 
> while the majority uses UpperCamelCase (IAmAnotherMetric). We need 
> consistency across the project and since the majority of the metrics uses the 
> camel case notation, we need to change the others.
> We might have to think about backward compatibility and also a KIP may be 
> needed since metrics are public interfaces.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15118) Need for a Centralized Configuration Management System in Apache Kafka

2023-06-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15118?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15118:
--
Labels: needs-kip  (was: )

> Need for a Centralized Configuration Management System in Apache Kafka
> --
>
> Key: KAFKA-15118
> URL: https://issues.apache.org/jira/browse/KAFKA-15118
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.7.1
>Reporter: Jimmy Wang
>Priority: Major
>  Labels: needs-kip
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Hi all,
> In our use of Apache Kafka, we found something strange following:
> Despite our partitions being well-distributed across brokers, we noticed a 
> significant discrepancy in disk usage between different brokers. 
> Specifically, the same partition takes up different amounts of disk space on 
> different brokers. Upon investigating, we found that the root cause of this 
> discrepancy was a variation in {{log.retention.hours}} setting between 
> different brokers.
> On the one hand, we know that we should make sure that the server.properties 
> should be same across the whole cluster. But on the other hand, could kafka 
> provide guarantee or advance check to make sure all the configurations of 
> brokers to be the same to avoid such situations?
> Here is some of my opinions:
>  # Provide centralized configuration center or just manage them in internal 
> topics(kraft mode) or zookeeper like dynamic configuration.
>  # Warn users about potential inconsistencies during the broker startup 
> process.
> Best regards,
> Jimmy Wang



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15118) Need for a Centralized Configuration Management System in Apache Kafka

2023-06-26 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737245#comment-17737245
 ] 

Kirk True commented on KAFKA-15118:
---

[~jimmywang611] Thanks for the ticket. Centralized configuration is something 
we've talked about doing for {_}clients{_}, but brokers could benefit from this 
too. I'm not sure if this is something that should be handled inside Kafka or 
relegated to the orchestration layer outside of Kafka.

Either way, this is project would have a lot of ramifications, so a KIP would 
almost certainly be needed. Is that something that you plan to write up?

Thanks!

> Need for a Centralized Configuration Management System in Apache Kafka
> --
>
> Key: KAFKA-15118
> URL: https://issues.apache.org/jira/browse/KAFKA-15118
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.7.1
>Reporter: Jimmy Wang
>Priority: Major
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Hi all,
> In our use of Apache Kafka, we found something strange following:
> Despite our partitions being well-distributed across brokers, we noticed a 
> significant discrepancy in disk usage between different brokers. 
> Specifically, the same partition takes up different amounts of disk space on 
> different brokers. Upon investigating, we found that the root cause of this 
> discrepancy was a variation in {{log.retention.hours}} setting between 
> different brokers.
> On the one hand, we know that we should make sure that the server.properties 
> should be same across the whole cluster. But on the other hand, could kafka 
> provide guarantee or advance check to make sure all the configurations of 
> brokers to be the same to avoid such situations?
> Here is some of my opinions:
>  # Provide centralized configuration center or just manage them in internal 
> topics(kraft mode) or zookeeper like dynamic configuration.
>  # Warn users about potential inconsistencies during the broker startup 
> process.
> Best regards,
> Jimmy Wang



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN

2023-06-26 Thread via GitHub


jsancio commented on code in PR #13917:
URL: https://github.com/apache/kafka/pull/13917#discussion_r1242449562


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -978,9 +978,9 @@ public static void atomicMoveWithFallback(Path source, Path 
target, boolean need
 Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
 } catch (IOException outer) {
 try {
+log.warn("Failed atomic move of {} to {} retring with a 
non-atomic move", source, target, outer);
 Files.move(source, target, 
StandardCopyOption.REPLACE_EXISTING);
-log.debug("Non-atomic move of {} to {} succeeded after atomic 
move failed due to {}", source, target,
-outer.getMessage());
+log.debug("Non-atomic move of {} to {} succeeded after atomic 
move failed", source, target);

Review Comment:
   Yeah. I think so. I care more about the atomic move failing and less about 
the non-atomic move succeeding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15122) Moving partitions between log dirs leads to kafka.log:type=Log metrics being deleted

2023-06-26 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-15122.

Fix Version/s: 3.5.0
   Resolution: Duplicate

Duplicate of https://issues.apache.org/jira/browse/KAFKA-14544 which is fixed 
in 3.5.0

> Moving partitions between log dirs leads to kafka.log:type=Log metrics being 
> deleted
> 
>
> Key: KAFKA-15122
> URL: https://issues.apache.org/jira/browse/KAFKA-15122
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.5.0
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 3.5.0
>
>
> # Start a broker with 2 log directories
> # Create a topic-partition
> Metrics with the following names are created: 
> kafka.log:type=Log,name=Size,topic=,partition=0
> # Using kafka-reassign-partitions move that partition to the other log 
> directory
> A tag is-future=true is added to the existing metrics, 
> kafka.log:type=Log,name=Size,topic=,partition=0,is-future=true
> # Using kafka-reassign-partitions move that partition back to its original 
> log directory
> The metrics are deleted!
> I don't expect the metrics to be renamed during the first reassignment. The 
> metrics should not be deleted during the second reassignment, the topic still 
> exists. Restarting the broker resolves the issue.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15122) Moving partitions between log dirs leads to kafka.log:type=Log metrics being deleted

2023-06-26 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-15122:
---
Affects Version/s: 3.4.0
   (was: 3.5.0)

> Moving partitions between log dirs leads to kafka.log:type=Log metrics being 
> deleted
> 
>
> Key: KAFKA-15122
> URL: https://issues.apache.org/jira/browse/KAFKA-15122
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.4.0
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 3.5.0
>
>
> # Start a broker with 2 log directories
> # Create a topic-partition
> Metrics with the following names are created: 
> kafka.log:type=Log,name=Size,topic=,partition=0
> # Using kafka-reassign-partitions move that partition to the other log 
> directory
> A tag is-future=true is added to the existing metrics, 
> kafka.log:type=Log,name=Size,topic=,partition=0,is-future=true
> # Using kafka-reassign-partitions move that partition back to its original 
> log directory
> The metrics are deleted!
> I don't expect the metrics to be renamed during the first reassignment. The 
> metrics should not be deleted during the second reassignment, the topic still 
> exists. Restarting the broker resolves the issue.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN

2023-06-26 Thread via GitHub


rondagostino commented on code in PR #13917:
URL: https://github.com/apache/kafka/pull/13917#discussion_r1242445724


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -978,9 +978,9 @@ public static void atomicMoveWithFallback(Path source, Path 
target, boolean need
 Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
 } catch (IOException outer) {
 try {
+log.warn("Failed atomic move of {} to {} retring with a 
non-atomic move", source, target, outer);
 Files.move(source, target, 
StandardCopyOption.REPLACE_EXISTING);
-log.debug("Non-atomic move of {} to {} succeeded after atomic 
move failed due to {}", source, target,
-outer.getMessage());
+log.debug("Non-atomic move of {} to {} succeeded after atomic 
move failed", source, target);

Review Comment:
   Maybe it is okay now that we are logging the initial WARn?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN

2023-06-26 Thread via GitHub


rondagostino commented on code in PR #13917:
URL: https://github.com/apache/kafka/pull/13917#discussion_r1242444579


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -978,9 +978,9 @@ public static void atomicMoveWithFallback(Path source, Path 
target, boolean need
 Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
 } catch (IOException outer) {
 try {
+log.warn("Failed atomic move of {} to {} retring with a 
non-atomic move", source, target, outer);
 Files.move(source, target, 
StandardCopyOption.REPLACE_EXISTING);
-log.debug("Non-atomic move of {} to {} succeeded after atomic 
move failed due to {}", source, target,
-outer.getMessage());
+log.debug("Non-atomic move of {} to {} succeeded after atomic 
move failed", source, target);

Review Comment:
   Remain DEBUG?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN

2023-06-26 Thread via GitHub


jsancio commented on code in PR #13917:
URL: https://github.com/apache/kafka/pull/13917#discussion_r1242443198


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -979,7 +979,7 @@ public static void atomicMoveWithFallback(Path source, Path 
target, boolean need
 } catch (IOException outer) {
 try {
 Files.move(source, target, 
StandardCopyOption.REPLACE_EXISTING);
-log.debug("Non-atomic move of {} to {} succeeded after atomic 
move failed due to {}", source, target,
+log.warn("Non-atomic move of {} to {} succeeded after atomic 
move failed due to {}", source, target,

Review Comment:
   I agree. I added an WARN log message after the atomic move failed and before 
the non-atomic move.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN

2023-06-26 Thread via GitHub


jsancio commented on code in PR #13917:
URL: https://github.com/apache/kafka/pull/13917#discussion_r1242429861


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -979,7 +979,7 @@ public static void atomicMoveWithFallback(Path source, Path 
target, boolean need
 } catch (IOException outer) {
 try {
 Files.move(source, target, 
StandardCopyOption.REPLACE_EXISTING);
-log.debug("Non-atomic move of {} to {} succeeded after atomic 
move failed due to {}", source, target,
+log.warn("Non-atomic move of {} to {} succeeded after atomic 
move failed due to {}", source, target,

Review Comment:
   Hmm. Log levels are hard to argue. To me, it is really bad if a `move` fails 
and Kafka falls back to `copy`.
   
   There is a lot of code like the `Snaphsot.freeze` that assume `move` 
semantic for correctness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN

2023-06-26 Thread via GitHub


rondagostino commented on code in PR #13917:
URL: https://github.com/apache/kafka/pull/13917#discussion_r1242428311


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -979,7 +979,7 @@ public static void atomicMoveWithFallback(Path source, Path 
target, boolean need
 } catch (IOException outer) {
 try {
 Files.move(source, target, 
StandardCopyOption.REPLACE_EXISTING);
-log.debug("Non-atomic move of {} to {} succeeded after atomic 
move failed due to {}", source, target,
+log.warn("Non-atomic move of {} to {} succeeded after atomic 
move failed due to {}", source, target,

Review Comment:
   Any maybe we should log something above the retry saying we are going to 
retry?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13917: MINOR; Failed move should be logged at WARN

2023-06-26 Thread via GitHub


rondagostino commented on code in PR #13917:
URL: https://github.com/apache/kafka/pull/13917#discussion_r1242424176


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -979,7 +979,7 @@ public static void atomicMoveWithFallback(Path source, Path 
target, boolean need
 } catch (IOException outer) {
 try {
 Files.move(source, target, 
StandardCopyOption.REPLACE_EXISTING);
-log.debug("Non-atomic move of {} to {} succeeded after atomic 
move failed due to {}", source, target,
+log.warn("Non-atomic move of {} to {} succeeded after atomic 
move failed due to {}", source, target,

Review Comment:
   Is it really a WARN?  Maybe INFO is more appropriate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #13917: MINOR; Failed move should be logged at WARN

2023-06-26 Thread via GitHub


jsancio opened a new pull request, #13917:
URL: https://github.com/apache/kafka/pull/13917

   When Kafka fails to perform file move the error is getting swallowed. Kafka 
should log these cases at least at WARN level.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15122) Moving partitions between log dirs leads to kafka.log:type=Log metrics being deleted

2023-06-26 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-15122:
---
Description: 
# Start a broker with 2 log directories
# Create a topic-partition
Metrics with the following names are created: 
kafka.log:type=Log,name=Size,topic=,partition=0
# Using kafka-reassign-partitions move that partition to the other log directory
A tag is-future=true is added to the existing metrics, 
kafka.log:type=Log,name=Size,topic=,partition=0,is-future=true
# Using kafka-reassign-partitions move that partition back to its original log 
directory
The metrics are deleted!

I don't expect the metrics to be renamed during the first reassignment. The 
metrics should not be deleted during the second reassignment, the topic still 
exists. Restarting the broker resolves the issue.

 

  was:
# Start a broker with 2 log directories
# Create a topic-partition
Metrics with the following names are created: 
kafka.log:type=Log,name=Size,topic=,partition=0
# Using kafka-reassign-partitions move that partition to the other log directory
A tag isFuture=true is added to the existing metrics, 
kafka.log:type=Log,name=Size,topic=,partition=0,isFuture=true
# Using kafka-reassign-partitions move that partition back to its original log 
directory
The metrics are deleted!

I don't expect the metrics to be renamed during the first reassignment. The 
metrics should not be deleted during the second reassignment, the topic still 
exists. Restarting the broker resolves the issue.

 


> Moving partitions between log dirs leads to kafka.log:type=Log metrics being 
> deleted
> 
>
> Key: KAFKA-15122
> URL: https://issues.apache.org/jira/browse/KAFKA-15122
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.5.0
>Reporter: Mickael Maison
>Priority: Major
>
> # Start a broker with 2 log directories
> # Create a topic-partition
> Metrics with the following names are created: 
> kafka.log:type=Log,name=Size,topic=,partition=0
> # Using kafka-reassign-partitions move that partition to the other log 
> directory
> A tag is-future=true is added to the existing metrics, 
> kafka.log:type=Log,name=Size,topic=,partition=0,is-future=true
> # Using kafka-reassign-partitions move that partition back to its original 
> log directory
> The metrics are deleted!
> I don't expect the metrics to be renamed during the first reassignment. The 
> metrics should not be deleted during the second reassignment, the topic still 
> exists. Restarting the broker resolves the issue.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15123) Add tests for ChunkedBytesStream

2023-06-26 Thread Max Riedel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737201#comment-17737201
 ] 

Max Riedel commented on KAFKA-15123:


Hi [~divijvaidya] 
This sounds like a good task to start working on kafka. I would like to pick it 
up

> Add tests for ChunkedBytesStream
> 
>
> Key: KAFKA-15123
> URL: https://issues.apache.org/jira/browse/KAFKA-15123
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Minor
>  Labels: newbie
>
> We need to add cases against the public interfaces of this class to test for 
> scenarios for Int overflow etc. for input parameters
> Refer: [https://github.com/apache/kafka/pull/12948#discussion_r1242345211] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15124) KRaft snapshot freeze should never perform copy

2023-06-26 Thread Jira
José Armando García Sancio created KAFKA-15124:
--

 Summary: KRaft snapshot freeze should never perform copy
 Key: KAFKA-15124
 URL: https://issues.apache.org/jira/browse/KAFKA-15124
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-26 Thread via GitHub


divijvaidya commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1242377704


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException {
 
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
-long bytesSkipped = (avail < remaining) ? avail : remaining;
+int bytesSkipped = Math.min(avail, (int) remaining);

Review Comment:
   > It does show a gap in the tests for this class, we should ideally have 
tests that cover these boundary cases.
   
   Fair point. Added a JIRA https://issues.apache.org/jira/browse/KAFKA-15123 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15123) Add tests for ChunkedBytesStream

2023-06-26 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15123:


 Summary: Add tests for ChunkedBytesStream
 Key: KAFKA-15123
 URL: https://issues.apache.org/jira/browse/KAFKA-15123
 Project: Kafka
  Issue Type: Improvement
Reporter: Divij Vaidya


We need to add cases against the public interfaces of this class to test for 
scenarios for Int overflow etc. for input parameters

Refer: [https://github.com/apache/kafka/pull/12948#discussion_r1242345211] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15122) Moving partitions between log dirs leads to kafka.log:type=Log metrics being deleted

2023-06-26 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-15122:
--

 Summary: Moving partitions between log dirs leads to 
kafka.log:type=Log metrics being deleted
 Key: KAFKA-15122
 URL: https://issues.apache.org/jira/browse/KAFKA-15122
 Project: Kafka
  Issue Type: Task
Affects Versions: 3.5.0
Reporter: Mickael Maison


# Start a broker with 2 log directories
# Create a topic-partition
Metrics with the following names are created: 
kafka.log:type=Log,name=Size,topic=,partition=0
# Using kafka-reassign-partitions move that partition to the other log directory
A tag isFuture=true is added to the existing metrics, 
kafka.log:type=Log,name=Size,topic=,partition=0,isFuture=true
# Using kafka-reassign-partitions move that partition back to its original log 
directory
The metrics are deleted!

I don't expect the metrics to be renamed during the first reassignment. The 
metrics should not be deleted during the second reassignment, the topic still 
exists. Restarting the broker resolves the issue.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-26 Thread via GitHub


ijuma commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1242345211


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException {
 
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
-long bytesSkipped = (avail < remaining) ? avail : remaining;
+int bytesSkipped = Math.min(avail, (int) remaining);

Review Comment:
   > behaviour is implementation defined
   
   Can you share a reference for this? My understanding is that the Java 
specifies the behavior in these cases. Perhaps you're thinking of a C or 
something like that?
   
   > let's say remaining = Integer.MIN_VALUE + 1000
   
   I think you meant to say that if we have a `long` of value 
(Integer.MAX_VALUE + 1000) may result in sign extension during truncation 
resulting in a negative int. That's a fair point - my bad. I'll fix this. It 
does show a gap in the tests for this class, we should ideally have tests that 
cover these boundary cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15053) Regression for security.protocol validation starting from 3.3.0

2023-06-26 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15053:
-
Fix Version/s: 3.6.0

> Regression for security.protocol validation starting from 3.3.0
> ---
>
> Key: KAFKA-15053
> URL: https://issues.apache.org/jira/browse/KAFKA-15053
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Bo Gao
>Assignee: Bo Gao
>Priority: Major
> Fix For: 3.6.0
>
>
> [This|https://issues.apache.org/jira/browse/KAFKA-13793] Jira issue 
> introduced validations on multiple configs. As a consequence, config 
> {{security.protocol}} now only allows upper case values such as PLAINTEXT, 
> SSL, SASL_PLAINTEXT, SASL_SSL. Before this change, lower case values like 
> sasl_ssl, ssl are also supported, there's even a case insensitive logic 
> inside 
> [SecurityProtocol|https://github.com/apache/kafka/blob/146a6976aed0d9f90c70b6f21dca8b887cc34e71/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java#L70-L73]
>  to handle the lower case values.
> I think we should treat this as a regression bug since we don't support lower 
> case values anymore since 3.3.0. For versions later than 3.3.0, we are 
> getting error like this when using lower case value sasl_ssl
> {{Invalid value sasl_ssl for configuration security.protocol: String must be 
> one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13700: KAFKA-14959: remove delayed queue and exempt sensors during ClientQuota and ClientRequestQuota managers shutdown

2023-06-26 Thread via GitHub


divijvaidya commented on PR #13700:
URL: https://github.com/apache/kafka/pull/13700#issuecomment-1607659721

   I wanted to let you know that this is on my radar and I need a few more days 
to get to this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-26 Thread via GitHub


divijvaidya commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1242328144


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException {
 
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
-long bytesSkipped = (avail < remaining) ? avail : remaining;
+int bytesSkipped = Math.min(avail, (int) remaining);

Review Comment:
   I am concerned about cases where `remaining` is outside the range of 
representable `int` values, let's say `remaining = Integer.MIN_VALUE + 1000`.  
When down casting in cases of overflow (i.e. in `Math.min(avail, (int) 
remaining)`), the behaviour is implementation defined and numeric value for 
`(int)remaining` may end up leading to a negative value numeric int. 
Calculating min for this negative value with avail will lead to bytesSkipped as 
negative.
   
   In the implementation I suggested, the result of `Math.min((long) avail, 
remaining)` is guaranteed to fit in `int` because it's upper bound by `avail`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15105) Flaky test FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable

2023-06-26 Thread Max Riedel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737186#comment-17737186
 ] 

Max Riedel commented on KAFKA-15105:


I would like to work on this issue. I'm still trying to understand how the 
build infrastructure works. Can someone give me a hint, how to reproduce the 
behavior?

> Flaky test 
> FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable
> -
>
> Key: KAFKA-15105
> URL: https://issues.apache.org/jira/browse/KAFKA-15105
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.5.0
>Reporter: Josep Prat
>Priority: Major
>  Labels: flaky-test
>
> Test  
> integration.kafka.server.FetchFromFollowerIntegrationTest.testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable()
>  became flaky. An example can be found here: 
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13865/2/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_11_and_Scala_2_13___testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable__/]
> The error might be caused because of a previous kafka cluster used for 
> another test wasn't cleaned up properly before this one run.
>  
> h3. Error Message
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists.{code}
> h3. Stacktrace
> {code:java}
> org.apache.kafka.common.errors.TopicExistsException: Topic 
> '__consumer_offsets' already exists. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13831: KAFKA-15053: Use case insensitive validator for security.protocol config

2023-06-26 Thread via GitHub


divijvaidya commented on code in PR #13831:
URL: https://github.com/apache/kafka/pull/13831#discussion_r1242302387


##
docs/security.html:
##
@@ -72,6 +72,10 @@ SSL
   SASL_PLAINTEXT
   SASL_SSL
+  plaintext

Review Comment:
   Instead of adding all values, please consider the following:
   
   `Possible options (case insensitive) for the security protocol are given 
below:`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-26 Thread via GitHub


ijuma commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1242312432


##
Jenkinsfile:
##
@@ -155,79 +155,23 @@ pipeline {
 echo 'Skipping Kafka Streams archetype test for Java 17'
   }
 }
-
-// To avoid excessive Jenkins resource usage, we only run the stages
-// above at the PR stage. The ones below are executed after changes
-// are pushed to trunk and/or release branches. We achieve this via
-// the `when` clause.
-
-stage('JDK 8 and Scala 2.13') {

Review Comment:
   That statement isn't strictly true since there could be bugs in the 
implementation - that's why we had these tests in the first place. But the 
truth is that they're low value since (1) the probability of a bug affecting 
JDK 8 and Scala 2.13 but not JDK 8 and Scala 2.12 is low (2) having too many 
variants makes it even less likely for people to pay attention to the flaky 
failures that exist (3) it's expensive to run so many variants.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-26 Thread via GitHub


ijuma commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1242309422


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException {
 
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
-long bytesSkipped = (avail < remaining) ? avail : remaining;
+int bytesSkipped = Math.min(avail, (int) remaining);

Review Comment:
   Can you please explain an example where these two variants would have a 
different result given that `avail` is a positive int?



##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException {
 
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
-long bytesSkipped = (avail < remaining) ? avail : remaining;
+int bytesSkipped = Math.min(avail, (int) remaining);

Review Comment:
   Can you please share an example where these two variants would have a 
different result given that `avail` is a positive int?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12685: KAFKA-14945: Add Serializer#serializeToByteBuffer() to reduce memory copying

2023-06-26 Thread via GitHub


divijvaidya commented on PR #12685:
URL: https://github.com/apache/kafka/pull/12685#issuecomment-1607609145

   @LinShunKang Give me a few days. I will get to it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13915: KAFKA-14930: Document the new PATCH and DELETE offsets REST APIs for Connect

2023-06-26 Thread via GitHub


C0urante commented on code in PR #13915:
URL: https://github.com/apache/kafka/pull/13915#discussion_r1242245895


##
docs/connect.html:
##
@@ -301,7 +301,7 @@ REST 
API
 GET /connectors/{name}/tasks - get a list of tasks 
currently running for a connector
 GET /connectors/{name}/tasks/{taskid}/status - get 
current status of the task, including if it is running, failed, paused, etc., 
which worker it is assigned to, and error information if it has failed
 PUT /connectors/{name}/pause - pause the connector 
and its tasks, which stops message processing until the connector is resumed. 
Any resources claimed by its tasks are left allocated, which allows the 
connector to begin processing data quickly once it is resumed.
-PUT /connectors/{name}/stop - stop the connector and 
shut down its tasks, deallocating any resources claimed by its tasks. This is 
more efficient from a resource usage standpoint than pausing the connector, but 
can cause it to take longer to begin processing data once resumed.
+PUT /connectors/{name}/stop - stop the connector and 
shut down its tasks, deallocating any resources claimed by its tasks. This is 
more efficient from a resource usage standpoint than pausing the connector, but 
can cause it to take longer to begin processing data once resumed. Note that 
the offsets for a connector can be modified via the offsets management REST 
APIs only if it is in the stopped state.

Review Comment:
   ```suggestion
   PUT /connectors/{name}/stop - stop the connector 
and shut down its tasks, deallocating any resources claimed by its tasks. This 
is more efficient from a resource usage standpoint than pausing the connector, 
but can cause it to take longer to begin processing data once resumed. Note 
that the offsets for a connector can be only modified via the offsets 
management endpoints if it is in the stopped state.
   ```



##
docs/connect.html:
##
@@ -313,7 +313,13 @@ REST 
API
 DELETE /connectors/{name} - delete a connector, 
halting all tasks and deleting its configuration
 GET /connectors/{name}/topics - get the set of topics 
that a specific connector is using since the connector was created or since a 
request to reset its set of active topics was issued
 PUT /connectors/{name}/topics/reset - send a request 
to empty the set of active topics of a connector
-GET /connectors/{name}/offsets - get the current 
offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details)
+Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details):
+
+GET /connectors/{name}/offsets - get the 
current offsets for a connector
+DELETE /connectors/{name}/offsets - reset the 
offsets for a connector. The connector must exist and must be in the stopped 
state.

Review Comment:
   Nit: for consistency, we should leave out the period from the last sentence 
of each item
   ```suggestion
   DELETE /connectors/{name}/offsets - reset 
the offsets for a connector. The connector must exist and must be in the 
stopped state
   ```
   
   Also, would it be possible to link to the docs for the `PUT 
/connectors/{name}/stop` endpoint when we refer to it here?



##
docs/connect.html:
##
@@ -313,7 +313,13 @@ REST 
API
 DELETE /connectors/{name} - delete a connector, 
halting all tasks and deleting its configuration
 GET /connectors/{name}/topics - get the set of topics 
that a specific connector is using since the connector was created or since a 
request to reset its set of active topics was issued
 PUT /connectors/{name}/topics/reset - send a request 
to empty the set of active topics of a connector
-GET /connectors/{name}/offsets - get the current 
offsets for a connector (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details)
+Offsets management REST APIs (see https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect;>KIP-875
 for more details):
+
+GET /connectors/{name}/offsets - get the 
current offsets for a connector
+DELETE /connectors/{name}/offsets - reset the 
offsets for a connector. The connector must exist and must be in the stopped 
state.
+PATCH /connectors/{name}/offsets - alter the 
offsets for a connector. The connector must exist and must be in the stopped 
state. The request body should be a JSON object containing a JSON array 
offsets field, similar to the response body of the GET 
/connectors/{name}/offsets REST API.

Review Comment:
   Same nit RE trailing period, 

[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242226466


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2688,20 +2614,8 @@ public void 
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
 .thenReturn(singletonList(task10));
 
 final ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata("appId");
-expect(consumer.groupMetadata()).andReturn(groupMetadata);
-producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-expectLastCall();
-
-task00.committedOffsets();
-EasyMock.expectLastCall();
-task01.committedOffsets();
-EasyMock.expectLastCall();
-task02.committedOffsets();
-EasyMock.expectLastCall();
-task10.committedOffsets();
-EasyMock.expectLastCall();
-
-replay(consumer);
+when(consumer.groupMetadata()).thenReturn(groupMetadata);
+doNothing().when(producer).commitTransaction(expectedCommittedOffsets, 
groupMetadata);

Review Comment:
   Alright, I removed it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242149622


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) 
{
 };
 
 // `handleAssignment`
-expectRestoreToBeCompleted(consumer);
+expectAssignmentToBeCalled(consumer);
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-expect(consumer.assignment()).andReturn(taskId00Partitions);
-replay(consumer);
+final Set partitions = union(HashSet::new, 
taskId00Partitions);
+when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   ~~Oh, silly me, there is a mkSet method in Utils. Okay, I will remedy this.~~
   Silly me x2, the thing is already a set



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242148687


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) 
{
 };
 
 // `handleAssignment`
-expectRestoreToBeCompleted(consumer);
+expectAssignmentToBeCalled(consumer);
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-expect(consumer.assignment()).andReturn(taskId00Partitions);
-replay(consumer);
+final Set partitions = union(HashSet::new, 
taskId00Partitions);
+when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   ~~Because I couldn't figure out a better way to create a set with only one 
element and Collections does not have a singletonSet  ~~



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) 
{
 };
 
 // `handleAssignment`
-expectRestoreToBeCompleted(consumer);
+expectAssignmentToBeCalled(consumer);
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-expect(consumer.assignment()).andReturn(taskId00Partitions);
-replay(consumer);
+final Set partitions = union(HashSet::new, 
taskId00Partitions);
+when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   ~~Because I couldn't figure out a better way to create a set with only one 
element and Collections does not have a singletonSet~~ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242151305


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) 
{
 };
 
 // `handleAssignment`
-expectRestoreToBeCompleted(consumer);
+expectAssignmentToBeCalled(consumer);
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-expect(consumer.assignment()).andReturn(taskId00Partitions);
-replay(consumer);
+final Set partitions = union(HashSet::new, 
taskId00Partitions);
+when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   You are correct, it appears I can remove line 1923



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242149622


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) 
{
 };
 
 // `handleAssignment`
-expectRestoreToBeCompleted(consumer);
+expectAssignmentToBeCalled(consumer);
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-expect(consumer.assignment()).andReturn(taskId00Partitions);
-replay(consumer);
+final Set partitions = union(HashSet::new, 
taskId00Partitions);
+when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   Oh, silly me, there is a mkSet method in Utils. Okay, I will remedy this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242148687


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1961,10 +1920,10 @@ public void postCommit(final boolean enforceCheckpoint) 
{
 };
 
 // `handleAssignment`
-expectRestoreToBeCompleted(consumer);
+expectAssignmentToBeCalled(consumer);
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-expect(consumer.assignment()).andReturn(taskId00Partitions);
-replay(consumer);
+final Set partitions = union(HashSet::new, 
taskId00Partitions);
+when(consumer.assignment()).thenReturn(partitions);

Review Comment:
   Because I couldn't figure out a better way to create a set with only one 
element and Collections does not have a singletonSet  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242144879


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2418,19 +2368,18 @@ public void markChangelogAsCorrupted(final 
Collection partitions
 mkEntry(taskId02, taskId02Partitions)
 );
 
-expectRestoreToBeCompleted(consumer);
+expectAssignmentToBeCalled(consumer);
 
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
 .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, 
unrevokedActiveTaskWithoutCommitNeeded));
 
 final ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata("appId");
-expect(consumer.groupMetadata()).andReturn(groupMetadata);
+when(consumer.groupMetadata()).thenReturn(groupMetadata);
 
 doThrow(new 
TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets, 
groupMetadata);
 
-expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
-
-replay(consumer, stateManager);

Review Comment:
    



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2418,19 +2368,18 @@ public void markChangelogAsCorrupted(final 
Collection partitions
 mkEntry(taskId02, taskId02Partitions)
 );
 
-expectRestoreToBeCompleted(consumer);
+expectAssignmentToBeCalled(consumer);
 
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
 .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, 
unrevokedActiveTaskWithoutCommitNeeded));
 
 final ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata("appId");
-expect(consumer.groupMetadata()).andReturn(groupMetadata);
+when(consumer.groupMetadata()).thenReturn(groupMetadata);
 
 doThrow(new 
TimeoutException()).when(producer).commitTransaction(expectedCommittedOffsets, 
groupMetadata);
 
-expect(consumer.assignment()).andStubReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
-
-replay(consumer, stateManager);

Review Comment:
   I will re-add it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242142731


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2469,19 +2416,18 @@ public void 
shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
 assertThat(task00.state(), is(Task.State.CLOSED));
 assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
 assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+verifyResumeWasCalled(consumer);

Review Comment:
   On the same topic, yes, I could inline the setup and if you are okay with it 
I will then inline the verification as well. Let me know your thoughts!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242140474


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2469,19 +2416,18 @@ public void 
shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
 assertThat(task00.state(), is(Task.State.CLOSED));
 assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
 assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+verifyResumeWasCalled(consumer);

Review Comment:
   I will answer here, but the answer applies to all the other places you have 
asked it as well.
   The original code for `expectRestoreToBeCompleted` was:
   ```
   private static void expectRestoreToBeCompleted(final Consumer consumer) {
   final Set assignment = singleton(new 
TopicPartition("assignment", 0));
   expect(consumer.assignment()).andReturn(assignment);
   consumer.resume(assignment);
   expectLastCall();
   }
   ```
   This should in theory be changed to
   ```
   private static void expectRestoreToBeCompleted(final Consumer consumer) {
   final Set assignment = singleton(new 
TopicPartition("assignment", 0));
   when(consumer.assignment()).thenReturn(assignment);
   Mockito.verify(consumer).resume(assignment);
   }
   ```
   However, if we have the verify there it will fail as it needs to be verified 
after the object/method under test is exercised. Hence I moved it to a separate 
function and added it as a verification at the end of the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242132233


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -4050,12 +3928,8 @@ public void shouldHaveRemainingPartitionsUncleared() {
 final Map offsets = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
 task00.setCommittableOffsetsAndMetadata(offsets);
 
-expectRestoreToBeCompleted(consumer);
+expectAssignmentToBeCalled(consumer);
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-consumer.commitSync(offsets);

Review Comment:
   This is not called, this is why I removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242131444


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3418,16 +3311,12 @@ public void shouldCommitProvidedTasksIfNeeded() {
 mkEntry(taskId05, taskId05Partitions)
 );
 
-expectRestoreToBeCompleted(consumer);
+expectAssignmentToBeCalled(consumer);
 when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
 .thenReturn(Arrays.asList(task00, task01, task02));
 when(standbyTaskCreator.createTasks(assignmentStandby))
 .thenReturn(Arrays.asList(task03, task04, task05));
 
-consumer.commitSync(eq(emptyMap()));

Review Comment:
   This is just not called, that's why I removed it originally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242121402


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3219,17 +3128,6 @@ public void shouldCloseStandbyTasksOnShutdown() {
 // `handleAssignment`
 
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00));
 
-// `tryToCompleteRestoration`
-expect(consumer.assignment()).andReturn(emptySet());
-consumer.resume(eq(emptySet()));
-expectLastCall();
-
-// `shutdown`
-consumer.commitSync(Collections.emptyMap());

Review Comment:
   ~~I don't think there is a need for verification here. The consumer was only 
replayed in EasyMock and the replayed behaviour comes by default in Mockito~~
   This is just not called, that's why I removed it originally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


clolov commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1242121402


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -3219,17 +3128,6 @@ public void shouldCloseStandbyTasksOnShutdown() {
 // `handleAssignment`
 
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00));
 
-// `tryToCompleteRestoration`
-expect(consumer.assignment()).andReturn(emptySet());
-consumer.resume(eq(emptySet()));
-expectLastCall();
-
-// `shutdown`
-consumer.commitSync(Collections.emptyMap());

Review Comment:
   I don't think there is a need for verification here. The consumer was only 
replayed in EasyMock and the replayed behaviour comes by default in Mockito



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13873: KAFKA-14133: Migrate Consumer mock in TaskManagerTest to Mockito

2023-06-26 Thread via GitHub


cadonna commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1241990023


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
 assertEquals(taskManager.notPausedTasks().size(), 0);
 }
 
-private static void expectRestoreToBeCompleted(final Consumer consumer) {
+private static void expectAssignmentToBeCalled(final Consumer consumer) {
 final Set assignment = singleton(new 
TopicPartition("assignment", 0));
-expect(consumer.assignment()).andReturn(assignment);
-consumer.resume(assignment);
-expectLastCall();
+when(consumer.assignment()).thenReturn(assignment);
+}

Review Comment:
   I would inline this function since it became an one-liner.
   
   ```java
   when(consumer.assignment()).thenReturn(singleton(new 
TopicPartition("assignment", 0)));
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2034,12 +1993,9 @@ public void 
shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
 // `handleAssignment`
 when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
 .thenReturn(asList(corruptedTask, nonCorruptedTask));
-expectRestoreToBeCompleted(consumer);
-expect(consumer.assignment()).andReturn(taskId00Partitions);
-// check that we should not commit empty map either
-consumer.commitSync(eq(emptyMap()));
-expectLastCall().andStubThrow(new AssertionError("should not invoke 
commitSync when offset map is empty"));
-replay(consumer);
+expectAssignmentToBeCalled(consumer);
+final Set partitions = union(HashSet::new, 
taskId00Partitions);

Review Comment:
   Same here



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2449,17 +2398,15 @@ public void markChangelogAsCorrupted(final 
Collection partitions
 assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
 assertThat(unrevokedActiveTask.state(), is(State.CREATED));
 assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), 
is(State.RUNNING));
+verifyResumeWasCalledWith(consumer, partitions);

Review Comment:
   Why do you verify? It was not verified in the original code.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2377,6 +2326,7 @@ public void markChangelogAsCorrupted(final 
Collection partitions
 assertThat(revokedActiveTask.state(), is(State.SUSPENDED));
 assertThat(unrevokedActiveTaskWithCommitNeeded.state(), 
is(State.CREATED));
 assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), 
is(State.RUNNING));
+verifyResumeWasCalledWith(consumer, partition);

Review Comment:
   Why do you verify? It was not verified in the original code.
   
   I added a couple of this comments below. However, I am not sure whether the 
original author did not want to verify the consumer or if they forgot about it. 
I will leave it to you if you want to keep them or not.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2805,6 +2716,7 @@ public void 
shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
 
 assertThat(task00.commitNeeded, is(true));
 assertThat(task10.commitPrepared, is(false));
+verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2834,14 +2744,14 @@ public void 
shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
 taskManager.handleAssignment(assignmentActive, Collections.emptyMap());
 
 assertThat(task00.commitNeeded, is(true));
+verifyResumeWasCalled(consumer);

Review Comment:
   Why do you verify? It was not verified in the original code.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -2688,20 +2614,8 @@ public void 
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
 .thenReturn(singletonList(task10));
 
 final ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata("appId");
-expect(consumer.groupMetadata()).andReturn(groupMetadata);
-producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-expectLastCall();
-
-task00.committedOffsets();
-EasyMock.expectLastCall();
-task01.committedOffsets();
-EasyMock.expectLastCall();
-task02.committedOffsets();
-EasyMock.expectLastCall();
-task10.committedOffsets();
-EasyMock.expectLastCall();


[GitHub] [kafka] divijvaidya commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-26 Thread via GitHub


divijvaidya commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1242092544


##
Jenkinsfile:
##
@@ -155,79 +155,23 @@ pipeline {
 echo 'Skipping Kafka Streams archetype test for Java 17'
   }
 }
-
-// To avoid excessive Jenkins resource usage, we only run the stages
-// above at the PR stage. The ones below are executed after changes
-// are pushed to trunk and/or release branches. We achieve this via
-// the `when` clause.
-
-stage('JDK 8 and Scala 2.13') {

Review Comment:
   Adding this note here for future visitors to this PR (and to ensure that we 
are on the same page here about the reason to remove these stages from CI)
   
   Removing this combination from tests in fine because scala 2.13 is fully 
compatible with JDK 8 [1]. If we can guarantee that code is compiling correctly 
with 2.13 and JDK 8 separately, then that is sufficient and we do not 
necessarily have to test a unique combination of both.
   
   [1] https://docs.scala-lang.org/overviews/jdk-compatibility/overview.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano opened a new pull request, #13916: WIP DelegationToken Support for KRaft

2023-06-26 Thread via GitHub


pprovenzano opened a new pull request, #13916:
URL: https://github.com/apache/kafka/pull/13916

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #12948: MINOR: Add JDK 20 CI build and remove some branch builds

2023-06-26 Thread via GitHub


divijvaidya commented on code in PR #12948:
URL: https://github.com/apache/kafka/pull/12948#discussion_r1242078745


##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -290,25 +290,27 @@ public long skip(long toSkip) throws IOException {
 
 // Skip bytes stored in intermediate buffer first
 int avail = count - pos;
-long bytesSkipped = (avail < remaining) ? avail : remaining;
+int bytesSkipped = Math.min(avail, (int) remaining);

Review Comment:
   I am not sure that this is a good idea to downcast and then perform 
comparison. Comparison amongst different types is performed by upgrading them 
to the higher type and then performing comparison. In current approach, we 
downcasting `remaining` to int is unsafe.
   
   May I suggest an alternative where we upgrade avail to `long` before 
performing comparison and then converting result to int:` int bytesSkipped = 
(int) Math.min((long) avail, remaining);`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14972) Make KafkaConsumer usable in async runtimes

2023-06-26 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14972?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-14972:
-

Assignee: Erik van Oosten

> Make KafkaConsumer usable in async runtimes
> ---
>
> Key: KAFKA-14972
> URL: https://issues.apache.org/jira/browse/KAFKA-14972
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Erik van Oosten
>Assignee: Erik van Oosten
>Priority: Major
>
> KafkaConsumer contains a check that rejects nested invocations from different 
> threads (method {{{}acquire{}}}). For users that use an async runtime, this 
> is an almost impossible requirement. Examples of async runtimes that are 
> affected are Kotlin co-routines (see KAFKA-7143) and Zio.
> We propose to replace the thread-id check with an access-id that is stored on 
> a thread-local variable. Existing programs will not be affected. Developers 
> that work in an async runtime can pick up the access-id and set it on the 
> thread-local variable in a thread of their choosing.
> Every time a callback is invoked a new access-id is generated. When the 
> callback completes, the previous access-id is restored.
> This proposal does not make it impossible to use the client incorrectly. 
> However, we think it strikes a good balance between making correct usage from 
> an async runtime possible while making incorrect usage difficult.
> Alternatives considered:
>  # Configuration that switches off the check completely.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #13914: KAFKA-14972: Support async runtimes in consumer

2023-06-26 Thread via GitHub


mimaison commented on PR #13914:
URL: https://github.com/apache/kafka/pull/13914#issuecomment-1607082695

   Thanks for the PR. This introduces public methods in KafkaConsumer which is 
part of the public API. SO in order to accept this, we first need to vote a 
KIP. See the process in the [Kafka Improvement 
Proposal](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)
 page on the wiki.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >